diff --git a/pkg/controllers/openapi/aggregator/aggregator.go b/pkg/controllers/openapi/aggregator/aggregator.go index 8491293e..43571893 100644 --- a/pkg/controllers/openapi/aggregator/aggregator.go +++ b/pkg/controllers/openapi/aggregator/aggregator.go @@ -67,7 +67,9 @@ type openAPISpecInfo struct { // The downloader is used only for non-local apiservices to // re-update the spec every so often. - downloader cached.Data[*spec.Swagger] + // Calling Get() is not thread safe and should only be called by a single + // thread via the openapi controller. + downloader CacheableDownloader } type specAggregator struct { @@ -93,8 +95,7 @@ func buildAndRegisterSpecAggregatorForLocalServices(downloader *Downloader, aggr for i, handler := range delegationHandlers { name := fmt.Sprintf(localDelegateChainNamePattern, i+1) - spec := NewCacheableDownloader(downloader, handler) - spec = decorateError(name, spec) + spec := NewCacheableDownloader(name, downloader, handler) s.addLocalSpec(name, spec) } @@ -218,16 +219,21 @@ func (s *specAggregator) AddUpdateAPIService(apiService *v1.APIService, handler s.mutex.Lock() defer s.mutex.Unlock() - _, exists := s.specsByAPIServiceName[apiService.Name] + existingSpec, exists := s.specsByAPIServiceName[apiService.Name] if !exists { - s.specsByAPIServiceName[apiService.Name] = &openAPISpecInfo{ + specInfo := &openAPISpecInfo{ apiService: *apiService, - downloader: decorateError(apiService.Name, NewCacheableDownloader(s.downloader, handler)), + downloader: NewCacheableDownloader(apiService.Name, s.downloader, handler), } + specInfo.spec.Replace(cached.Result[*spec.Swagger]{Err: fmt.Errorf("spec for apiservice %s is not yet available", apiService.Name)}) + s.specsByAPIServiceName[apiService.Name] = specInfo s.openAPIVersionedService.UpdateSpecLazy(s.buildMergeSpecLocked()) + } else { + existingSpec.apiService = *apiService + existingSpec.downloader.UpdateHandler(handler) } - return s.updateServiceLocked(apiService.Name) + return nil } // RemoveAPIService removes an api service from OpenAPI aggregation. If it does not exist, no error is returned. @@ -243,14 +249,3 @@ func (s *specAggregator) RemoveAPIService(apiServiceName string) { // Re-create the mergeSpec for the new list of apiservices s.openAPIVersionedService.UpdateSpecLazy(s.buildMergeSpecLocked()) } - -// decorateError creates a new cache that wraps a downloader -// cache the name of the apiservice to help with debugging. -func decorateError(name string, cache cached.Data[*spec.Swagger]) cached.Data[*spec.Swagger] { - return cached.NewTransformer(func(result cached.Result[*spec.Swagger]) cached.Result[*spec.Swagger] { - if result.Err != nil { - return cached.NewResultErr[*spec.Swagger](fmt.Errorf("failed to download %v: %v", name, result.Err)) - } - return result - }, cache) -} diff --git a/pkg/controllers/openapi/aggregator/aggregator_test.go b/pkg/controllers/openapi/aggregator/aggregator_test.go index c5ad4e15..1b366e12 100644 --- a/pkg/controllers/openapi/aggregator/aggregator_test.go +++ b/pkg/controllers/openapi/aggregator/aggregator_test.go @@ -25,6 +25,7 @@ import ( "time" "bytes" + v1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1" "k8s.io/kube-openapi/pkg/common" "k8s.io/kube-openapi/pkg/validation/spec" @@ -90,6 +91,9 @@ func TestAddUpdateAPIService(t *testing.T) { if err := s.AddUpdateAPIService(apiService, handler); err != nil { t.Error(err) } + if err := s.UpdateAPIServiceSpec(apiService.Name); err != nil { + t.Error(err) + } swagger, err := fetchOpenAPI(mux) if err != nil { @@ -109,7 +113,9 @@ func TestAddUpdateAPIService(t *testing.T) { }, }, } - s.UpdateAPIServiceSpec(apiService.Name) + if err := s.UpdateAPIServiceSpec(apiService.Name); err != nil { + t.Error(err) + } swagger, err = fetchOpenAPI(mux) if err != nil { @@ -158,6 +164,9 @@ func TestAddRemoveAPIService(t *testing.T) { if err := s.AddUpdateAPIService(apiService, handler); err != nil { t.Error(err) } + if err := s.UpdateAPIServiceSpec(apiService.Name); err != nil { + t.Error(err) + } swagger, err := fetchOpenAPI(mux) if err != nil { @@ -178,6 +187,78 @@ func TestAddRemoveAPIService(t *testing.T) { expectPath(t, swagger, "/apis/apiregistration.k8s.io/v1") } +func TestUpdateAPIService(t *testing.T) { + mux := http.NewServeMux() + var delegationHandlers []http.Handler + delegate1 := &openAPIHandler{openapi: &spec.Swagger{ + SwaggerProps: spec.SwaggerProps{ + Paths: &spec.Paths{ + Paths: map[string]spec.PathItem{ + "/apis/foo/v1": {}, + }, + }, + }, + }} + delegationHandlers = append(delegationHandlers, delegate1) + + s := buildAndRegisterSpecAggregator(delegationHandlers, mux) + + apiService := &v1.APIService{ + Spec: v1.APIServiceSpec{ + Service: &v1.ServiceReference{Name: "dummy"}, + }, + } + apiService.Name = "apiservice" + + handler := &openAPIHandler{openapi: &spec.Swagger{ + SwaggerProps: spec.SwaggerProps{ + Paths: &spec.Paths{ + Paths: map[string]spec.PathItem{ + "/apis/apiservicegroup/v1": {}, + }, + }, + }, + }} + + handler2 := &openAPIHandler{openapi: &spec.Swagger{ + SwaggerProps: spec.SwaggerProps{ + Paths: &spec.Paths{ + Paths: map[string]spec.PathItem{}, + }, + }, + }} + + if err := s.AddUpdateAPIService(apiService, handler); err != nil { + t.Error(err) + } + if err := s.UpdateAPIServiceSpec(apiService.Name); err != nil { + t.Error(err) + } + + swagger, err := fetchOpenAPI(mux) + if err != nil { + t.Error(err) + } + expectPath(t, swagger, "/apis/apiservicegroup/v1") + expectPath(t, swagger, "/apis/apiregistration.k8s.io/v1") + + t.Logf("Updating APIService %s", apiService.Name) + if err := s.AddUpdateAPIService(apiService, handler2); err != nil { + t.Error(err) + } + if err := s.UpdateAPIServiceSpec(apiService.Name); err != nil { + t.Error(err) + } + + swagger, err = fetchOpenAPI(mux) + if err != nil { + t.Error(err) + } + // Ensure that the if the APIService is added and then handler is modified, the new data is reflected in the aggregated OpenAPI. + expectNoPath(t, swagger, "/apis/apiservicegroup/v1") + expectPath(t, swagger, "/apis/apiregistration.k8s.io/v1") +} + func TestFailingAPIServiceSkippedAggregation(t *testing.T) { mux := http.NewServeMux() var delegationHandlers []http.Handler @@ -233,8 +314,19 @@ func TestFailingAPIServiceSkippedAggregation(t *testing.T) { }, } - s.AddUpdateAPIService(apiServiceFailed, handlerFailed) - s.AddUpdateAPIService(apiServiceSuccess, handlerSuccess) + if err := s.AddUpdateAPIService(apiServiceSuccess, handlerSuccess); err != nil { + t.Error(err) + } + if err := s.AddUpdateAPIService(apiServiceFailed, handlerFailed); err != nil { + t.Error(err) + } + if err := s.UpdateAPIServiceSpec(apiServiceSuccess.Name); err != nil { + t.Error(err) + } + err := s.UpdateAPIServiceSpec(apiServiceFailed.Name) + if err == nil { + t.Errorf("Expected updating failing apiService %s to return error", apiServiceFailed.Name) + } swagger, err := fetchOpenAPI(mux) if err != nil { @@ -281,7 +373,12 @@ func TestAPIServiceFailSuccessTransition(t *testing.T) { }, } - s.AddUpdateAPIService(apiService, handler) + if err := s.AddUpdateAPIService(apiService, handler); err != nil { + t.Error(err) + } + if err := s.UpdateAPIServiceSpec(apiService.Name); err == nil { + t.Errorf("Expected error for when updating spec for failing apiservice") + } swagger, err := fetchOpenAPI(mux) if err != nil { @@ -304,12 +401,75 @@ func TestAPIServiceFailSuccessTransition(t *testing.T) { expectPath(t, swagger, "/apis/apiservicegroup/v1") } +func TestFailingAPIServiceDoesNotBlockAdd(t *testing.T) { + mux := http.NewServeMux() + var delegationHandlers []http.Handler + delegate1 := &openAPIHandler{openapi: &spec.Swagger{ + SwaggerProps: spec.SwaggerProps{ + Paths: &spec.Paths{ + Paths: map[string]spec.PathItem{ + "/apis/foo/v1": {}, + }, + }, + }, + }} + delegationHandlers = append(delegationHandlers, delegate1) + + s := buildAndRegisterSpecAggregator(delegationHandlers, mux) + + apiServiceFailed := &v1.APIService{ + Spec: v1.APIServiceSpec{ + Service: &v1.ServiceReference{Name: "dummy"}, + }, + } + apiServiceFailed.Name = "apiserviceFailed" + + // Create a handler that has a long response time and ensure that + // adding the APIService does not block. + handlerFailed := &openAPIHandler{ + delaySeconds: 5, + returnErr: true, + openapi: &spec.Swagger{ + SwaggerProps: spec.SwaggerProps{ + Paths: &spec.Paths{ + Paths: map[string]spec.PathItem{ + "/apis/failed/v1": {}, + }, + }, + }, + }, + } + + updateDone := make(chan bool) + go func() { + if err := s.AddUpdateAPIService(apiServiceFailed, handlerFailed); err != nil { + t.Error(err) + } + close(updateDone) + }() + + select { + case <-updateDone: + case <-time.After(2 * time.Second): + t.Errorf("AddUpdateAPIService affected by APIService response time") + } + + swagger, err := fetchOpenAPI(mux) + if err != nil { + t.Error(err) + } + expectPath(t, swagger, "/apis/foo/v1") + expectNoPath(t, swagger, "/apis/failed/v1") +} + type openAPIHandler struct { - openapi *spec.Swagger - returnErr bool + delaySeconds int + openapi *spec.Swagger + returnErr bool } func (o *openAPIHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + time.Sleep(time.Duration(o.delaySeconds) * time.Second) if o.returnErr { w.WriteHeader(500) return diff --git a/pkg/controllers/openapi/aggregator/downloader.go b/pkg/controllers/openapi/aggregator/downloader.go index 3098f593..f3e2a2ff 100644 --- a/pkg/controllers/openapi/aggregator/downloader.go +++ b/pkg/controllers/openapi/aggregator/downloader.go @@ -21,6 +21,7 @@ import ( "fmt" "net/http" "strings" + "sync/atomic" "k8s.io/apiserver/pkg/authentication/user" "k8s.io/apiserver/pkg/endpoints/request" @@ -28,25 +29,46 @@ import ( "k8s.io/kube-openapi/pkg/validation/spec" ) +type CacheableDownloader interface { + UpdateHandler(http.Handler) + Get() cached.Result[*spec.Swagger] +} + // cacheableDownloader is a downloader that will always return the data // and the etag. type cacheableDownloader struct { + name string downloader *Downloader - handler http.Handler - etag string - spec *spec.Swagger + // handler is the http Handler for the apiservice that can be replaced + handler atomic.Pointer[http.Handler] + etag string + spec *spec.Swagger } -// Creates a downloader that also returns the etag, making it useful to use as a cached dependency. -func NewCacheableDownloader(downloader *Downloader, handler http.Handler) cached.Data[*spec.Swagger] { - return &cacheableDownloader{ +// NewCacheableDownloader creates a downloader that also returns the etag, making it useful to use as a cached dependency. +func NewCacheableDownloader(apiServiceName string, downloader *Downloader, handler http.Handler) CacheableDownloader { + c := &cacheableDownloader{ + name: apiServiceName, downloader: downloader, - handler: handler, } + c.handler.Store(&handler) + return c +} +func (d *cacheableDownloader) UpdateHandler(handler http.Handler) { + d.handler.Store(&handler) } func (d *cacheableDownloader) Get() cached.Result[*spec.Swagger] { - swagger, etag, status, err := d.downloader.Download(d.handler, d.etag) + r := d.get() + if r.Err != nil { + return cached.NewResultErr[*spec.Swagger](fmt.Errorf("failed to download %v: %v", d.name, r.Err)) + } + return r +} + +func (d *cacheableDownloader) get() cached.Result[*spec.Swagger] { + h := *d.handler.Load() + swagger, etag, status, err := d.downloader.Download(h, d.etag) if err != nil { return cached.NewResultErr[*spec.Swagger](err) } diff --git a/pkg/controllers/openapi/controller.go b/pkg/controllers/openapi/controller.go index 5f107150..69f32f4a 100644 --- a/pkg/controllers/openapi/controller.go +++ b/pkg/controllers/openapi/controller.go @@ -139,7 +139,10 @@ func (c *AggregationController) AddAPIService(handler http.Handler, apiService * // UpdateAPIService updates API Service's info and handler. func (c *AggregationController) UpdateAPIService(handler http.Handler, apiService *v1.APIService) { - if err := c.openAPIAggregationManager.AddUpdateAPIService(apiService, handler); err != nil { + if apiService.Spec.Service == nil { + return + } + if err := c.openAPIAggregationManager.UpdateAPIServiceSpec(apiService.Name); err != nil { utilruntime.HandleError(fmt.Errorf("Error updating APIService %q with err: %v", apiService.Name, err)) } key := apiService.Name