diff --git a/api/v1beta2/condition_types.go b/api/v1beta2/condition_types.go index c35c2c528..711469eb8 100644 --- a/api/v1beta2/condition_types.go +++ b/api/v1beta2/condition_types.go @@ -97,4 +97,7 @@ const ( // ArtifactUpToDateReason signals that an existing Artifact is up-to-date // with the Source. ArtifactUpToDateReason string = "ArtifactUpToDate" + + // CacheOperationFailedReason signals a failure in cache operation. + CacheOperationFailedReason string = "CacheOperationFailed" ) diff --git a/controllers/helmchart_controller.go b/controllers/helmchart_controller.go index 332e0ca4a..894eb99b6 100644 --- a/controllers/helmchart_controller.go +++ b/controllers/helmchart_controller.go @@ -30,6 +30,7 @@ import ( securejoin "github.com/cyphar/filepath-securejoin" helmgetter "helm.sh/helm/v3/pkg/getter" + helmrepo "helm.sh/helm/v3/pkg/repo" corev1 "k8s.io/api/core/v1" apierrs "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -55,6 +56,7 @@ import ( "github.com/fluxcd/pkg/untar" sourcev1 "github.com/fluxcd/source-controller/api/v1beta2" + "github.com/fluxcd/source-controller/internal/cache" serror "github.com/fluxcd/source-controller/internal/error" "github.com/fluxcd/source-controller/internal/helm/chart" "github.com/fluxcd/source-controller/internal/helm/getter" @@ -111,6 +113,9 @@ type HelmChartReconciler struct { Storage *Storage Getters helmgetter.Providers ControllerName string + + Cache *cache.Cache + TTL time.Duration } func (r *HelmChartReconciler) SetupWithManager(mgr ctrl.Manager) error { @@ -456,6 +461,13 @@ func (r *HelmChartReconciler) buildFromHelmRepository(ctx context.Context, obj * } } + // Try to retrieve the repository index from the cache + if r.Cache != nil { + if index, found := r.Cache.Get(r.Storage.LocalPath(*repo.GetArtifact())); found { + chartRepo.Index = index.(*helmrepo.IndexFile) + } + } + // Construct the chart builder with scoped configuration cb := chart.NewRemoteBuilder(chartRepo) opts := chart.BuildOptions{ @@ -479,6 +491,26 @@ func (r *HelmChartReconciler) buildFromHelmRepository(ctx context.Context, obj * return sreconcile.ResultEmpty, err } + defer func() { + // Cache the index if it was successfully retrieved + // and the chart was successfully built + if r.Cache != nil && chartRepo.Index != nil { + // The cache key have to be safe in multi-tenancy environments, + // as otherwise it could be used as a vector to bypass the helm repository's authentication. + // Using r.Storage.LocalPath(*repo.GetArtifact() is safe as the path is in the format ///. + err := r.Cache.Set(r.Storage.LocalPath(*repo.GetArtifact()), chartRepo.Index, r.TTL) + if err != nil { + r.eventLogf(ctx, obj, events.EventTypeTrace, sourcev1.CacheOperationFailedReason, "failed to cache index: %s", err) + } + + } + + // Delete the index reference + if chartRepo.Index != nil { + chartRepo.Unload() + } + }() + *b = *build return sreconcile.ResultSuccess, nil } diff --git a/controllers/suite_test.go b/controllers/suite_test.go index a585eeddd..d61015b91 100644 --- a/controllers/suite_test.go +++ b/controllers/suite_test.go @@ -35,6 +35,7 @@ import ( "github.com/fluxcd/pkg/testserver" sourcev1 "github.com/fluxcd/source-controller/api/v1beta2" + "github.com/fluxcd/source-controller/internal/cache" // +kubebuilder:scaffold:imports ) @@ -126,12 +127,15 @@ func TestMain(m *testing.M) { panic(fmt.Sprintf("Failed to start HelmRepositoryReconciler: %v", err)) } + cache := cache.New(5, 1*time.Second) if err := (&HelmChartReconciler{ Client: testEnv, EventRecorder: record.NewFakeRecorder(32), Metrics: testMetricsH, Getters: testGetters, Storage: testStorage, + Cache: cache, + TTL: 1 * time.Second, }).SetupWithManager(testEnv); err != nil { panic(fmt.Sprintf("Failed to start HelmRepositoryReconciler: %v", err)) } diff --git a/docs/spec/v1beta2/helmcharts.md b/docs/spec/v1beta2/helmcharts.md index 8f8f9800a..b423dde6d 100644 --- a/docs/spec/v1beta2/helmcharts.md +++ b/docs/spec/v1beta2/helmcharts.md @@ -390,6 +390,53 @@ Besides being reported in Events, the reconciliation errors are also logged by the controller. The Flux CLI offer commands for filtering the logs for a specific HelmChart, e.g. `flux logs --level=error --kind=HelmChart --name=`. +### Improving resource consumption by enabling the cache + +When using a `HelmRepository` as Source for a `HelmChart`, the controller loads +the repository index in memory to find the latest version of the chart. + +The controller can be configured to cache Helm repository indexes in memory. +The cache is used to avoid loading repository indexes for every `HelmChart` +reconciliation. + +The following flags are provided to enable and configure the cache: +- `helm-cache-max-size`: The maximum size of the cache in number of indexes. + If `0`, then the cache is disabled. +- `helm-cache-ttl`: The TTL of an index in the cache. +- `helm-cache-purge-interval`: The interval at which the cache is purged of + expired items. + +The caching strategy is to pull a repository index from the cache if it is +available, otherwise to load the index, retrieve and build the chart, +then cache the index. The cached index TTL is refreshed every time the +Helm repository index is loaded with the `helm-cache-ttl` value. + +The cache is purged of expired items every `helm-cache-purge-interval`. + +When the cache is full, no more items can be added to the cache, and the +source-controller will report a warning event instead. + +In order to use the cache, set the related flags in the source-controller +Deployment config: + +```yaml + spec: + containers: + - args: + - --watch-all-namespaces + - --log-level=info + - --log-encoding=json + - --enable-leader-election + - --storage-path=/data + - --storage-adv-addr=source-controller.$(RUNTIME_NAMESPACE).svc.cluster.local. + ## Helm cache with up to 10 items, i.e. 10 indexes. + - --helm-cache-max-size=10 + ## TTL of an index is 1 hour. + - --helm-cache-ttl=1h + ## Purge expired index every 10 minutes. + - --helm-cache-purge-interval=10m +``` + ## HelmChart Status ### Artifact diff --git a/internal/cache/LICENSE b/internal/cache/LICENSE new file mode 100644 index 000000000..f49969d7f --- /dev/null +++ b/internal/cache/LICENSE @@ -0,0 +1,19 @@ +Copyright (c) 2012-2019 Patrick Mylund Nielsen and the go-cache contributors + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. diff --git a/internal/cache/cache.go b/internal/cache/cache.go new file mode 100644 index 000000000..1c11f09d1 --- /dev/null +++ b/internal/cache/cache.go @@ -0,0 +1,246 @@ +// Copyright (c) 2012-2019 Patrick Mylund Nielsen and the go-cache contributors +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +// Copyright 2022 The FluxCD contributors. All rights reserved. +// This package provides an in-memory cache +// derived from the https://github.com/patrickmn/go-cache +// package +// It has been modified in order to keep a small set of functions +// and to add a maxItems parameter in order to limit the number of, +// and thus the size of, items in the cache. + +package cache + +import ( + "fmt" + "runtime" + "sync" + "time" +) + +// Cache is a thread-safe in-memory key/value store. +type Cache struct { + *cache +} + +// Item is an item stored in the cache. +type Item struct { + // Object is the item's value. + Object interface{} + // Expiration is the item's expiration time. + Expiration int64 +} + +type cache struct { + // Items holds the elements in the cache. + Items map[string]Item + // MaxItems is the maximum number of items the cache can hold. + MaxItems int + mu sync.RWMutex + janitor *janitor +} + +// ItemCount returns the number of items in the cache. +// This may include items that have expired, but have not yet been cleaned up. +func (c *cache) ItemCount() int { + c.mu.RLock() + n := len(c.Items) + c.mu.RUnlock() + return n +} + +func (c *cache) set(key string, value interface{}, expiration time.Duration) { + var e int64 + if expiration > 0 { + e = time.Now().Add(expiration).UnixNano() + } + + c.Items[key] = Item{ + Object: value, + Expiration: e, + } +} + +// Set adds an item to the cache, replacing any existing item. +// If expiration is zero, the item never expires. +// If the cache is full, Set will return an error. +func (c *cache) Set(key string, value interface{}, expiration time.Duration) error { + c.mu.Lock() + _, found := c.Items[key] + if found { + c.set(key, value, expiration) + c.mu.Unlock() + return nil + } + + if c.MaxItems > 0 && len(c.Items) < c.MaxItems { + c.set(key, value, expiration) + c.mu.Unlock() + return nil + } + + c.mu.Unlock() + return fmt.Errorf("Cache is full") +} + +// Add an item to the cache, existing items will not be overwritten. +// To overwrite existing items, use Set. +// If the cache is full, Add will return an error. +func (c *cache) Add(key string, value interface{}, expiration time.Duration) error { + c.mu.Lock() + _, found := c.Items[key] + if found { + c.mu.Unlock() + return fmt.Errorf("Item %s already exists", key) + } + + if c.MaxItems > 0 && len(c.Items) < c.MaxItems { + c.set(key, value, expiration) + c.mu.Unlock() + return nil + } + + c.mu.Unlock() + return fmt.Errorf("Cache is full") +} + +// Get an item from the cache. Returns the item or nil, and a bool indicating +// whether the key was found. +func (c *cache) Get(key string) (interface{}, bool) { + c.mu.RLock() + item, found := c.Items[key] + if !found { + c.mu.RUnlock() + return nil, false + } + if item.Expiration > 0 { + if item.Expiration < time.Now().UnixNano() { + c.mu.RUnlock() + return nil, false + } + } + c.mu.RUnlock() + return item.Object, true +} + +// Delete an item from the cache. Does nothing if the key is not in the cache. +func (c *cache) Delete(key string) { + c.mu.Lock() + delete(c.Items, key) + c.mu.Unlock() +} + +// Clear all items from the cache. +// This reallocate the inderlying array holding the items, +// so that the memory used by the items is reclaimed. +func (c *cache) Clear() { + c.mu.Lock() + c.Items = make(map[string]Item) + c.mu.Unlock() +} + +// HasExpired returns true if the item has expired. +func (c *cache) HasExpired(key string) bool { + c.mu.RLock() + item, ok := c.Items[key] + if !ok { + c.mu.RUnlock() + return true + } + if item.Expiration > 0 { + if item.Expiration < time.Now().UnixNano() { + c.mu.RUnlock() + return true + } + } + c.mu.RUnlock() + return false +} + +// SetExpiration sets the expiration for the given key. +// Does nothing if the key is not in the cache. +func (c *cache) SetExpiration(key string, expiration time.Duration) { + c.mu.Lock() + item, ok := c.Items[key] + if !ok { + c.mu.Unlock() + return + } + item.Expiration = time.Now().Add(expiration).UnixNano() + c.mu.Unlock() +} + +// GetExpiration returns the expiration for the given key. +// Returns zero if the key is not in the cache or the item +// has already expired. +func (c *cache) GetExpiration(key string) time.Duration { + c.mu.RLock() + item, ok := c.Items[key] + if !ok { + c.mu.RUnlock() + return 0 + } + if item.Expiration > 0 { + if item.Expiration < time.Now().UnixNano() { + c.mu.RUnlock() + return 0 + } + } + c.mu.RUnlock() + return time.Duration(item.Expiration - time.Now().UnixNano()) +} + +// DeleteExpired deletes all expired items from the cache. +func (c *cache) DeleteExpired() { + c.mu.Lock() + for k, v := range c.Items { + if v.Expiration > 0 && v.Expiration < time.Now().UnixNano() { + delete(c.Items, k) + } + } + c.mu.Unlock() +} + +type janitor struct { + interval time.Duration + stop chan bool +} + +func (j *janitor) run(c *cache) { + ticker := time.NewTicker(j.interval) + for { + select { + case <-ticker.C: + c.DeleteExpired() + case <-j.stop: + ticker.Stop() + return + } + } +} + +func stopJanitor(c *Cache) { + c.janitor.stop <- true +} + +// New creates a new cache with the given configuration. +func New(maxItems int, interval time.Duration) *Cache { + c := &cache{ + Items: make(map[string]Item), + MaxItems: maxItems, + janitor: &janitor{ + interval: interval, + stop: make(chan bool), + }, + } + + C := &Cache{c} + + if interval > 0 { + go c.janitor.run(c) + runtime.SetFinalizer(C, stopJanitor) + } + + return C +} diff --git a/internal/cache/cache_test.go b/internal/cache/cache_test.go new file mode 100644 index 000000000..70d87c8ab --- /dev/null +++ b/internal/cache/cache_test.go @@ -0,0 +1,87 @@ +/* +Copyright 2022 The Flux authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache + +import ( + "testing" + "time" + + . "github.com/onsi/gomega" +) + +func TestCache(t *testing.T) { + g := NewWithT(t) + // create a cache that can hold 2 items and have no cleanup + cache := New(2, 0) + + // Get an Item from the cache + if _, found := cache.Get("key1"); found { + t.Error("Item should not be found") + } + + // Add an item to the cache + err := cache.Add("key1", "value1", 0) + g.Expect(err).ToNot(HaveOccurred()) + + // Get the item from the cache + item, found := cache.Get("key1") + g.Expect(found).To(BeTrue()) + g.Expect(item).To(Equal("value1")) + + // Add another item to the cache + err = cache.Add("key2", "value2", 0) + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(cache.ItemCount()).To(Equal(2)) + + // Get the item from the cache + item, found = cache.Get("key2") + g.Expect(found).To(BeTrue()) + g.Expect(item).To(Equal("value2")) + + //Add an item to the cache + err = cache.Add("key3", "value3", 0) + g.Expect(err).To(HaveOccurred()) + + // Replace an item in the cache + err = cache.Set("key2", "value3", 0) + g.Expect(err).ToNot(HaveOccurred()) + + // Get the item from the cache + item, found = cache.Get("key2") + g.Expect(found).To(BeTrue()) + g.Expect(item).To(Equal("value3")) + + // new cache with a cleanup interval of 1 second + cache = New(2, 1*time.Second) + + // Add an item to the cache + err = cache.Add("key1", "value1", 2*time.Second) + g.Expect(err).ToNot(HaveOccurred()) + + // Get the item from the cache + item, found = cache.Get("key1") + g.Expect(found).To(BeTrue()) + g.Expect(item).To(Equal("value1")) + + // wait for the item to expire + time.Sleep(3 * time.Second) + + // Get the item from the cache + item, found = cache.Get("key1") + g.Expect(found).To(BeFalse()) + g.Expect(item).To(BeNil()) +} diff --git a/internal/helm/chart/builder_remote.go b/internal/helm/chart/builder_remote.go index 778efd253..b3594cefb 100644 --- a/internal/helm/chart/builder_remote.go +++ b/internal/helm/chart/builder_remote.go @@ -72,11 +72,13 @@ func (b *remoteChartBuilder) Build(_ context.Context, ref Reference, p string, o return nil, &BuildError{Reason: ErrChartReference, Err: err} } - if err := b.remote.LoadFromCache(); err != nil { - err = fmt.Errorf("could not load repository index for remote chart reference: %w", err) - return nil, &BuildError{Reason: ErrChartPull, Err: err} + // Load the repository index if not already present. + if b.remote.Index == nil { + if err := b.remote.LoadFromCache(); err != nil { + err = fmt.Errorf("could not load repository index for remote chart reference: %w", err) + return nil, &BuildError{Reason: ErrChartPull, Err: err} + } } - defer b.remote.Unload() // Get the current version for the RemoteReference cv, err := b.remote.Get(remoteRef.Name, remoteRef.Version) diff --git a/main.go b/main.go index 0577de4ed..1c398adc3 100644 --- a/main.go +++ b/main.go @@ -44,6 +44,7 @@ import ( sourcev1 "github.com/fluxcd/source-controller/api/v1beta2" "github.com/fluxcd/source-controller/controllers" + "github.com/fluxcd/source-controller/internal/cache" "github.com/fluxcd/source-controller/internal/helm" "github.com/fluxcd/source-controller/pkg/git/libgit2/managed" // +kubebuilder:scaffold:imports @@ -71,21 +72,24 @@ func init() { func main() { var ( - metricsAddr string - eventsAddr string - healthAddr string - storagePath string - storageAddr string - storageAdvAddr string - concurrent int - requeueDependency time.Duration - watchAllNamespaces bool - helmIndexLimit int64 - helmChartLimit int64 - helmChartFileLimit int64 - clientOptions client.Options - logOptions logger.Options - leaderElectionOptions leaderelection.Options + metricsAddr string + eventsAddr string + healthAddr string + storagePath string + storageAddr string + storageAdvAddr string + concurrent int + requeueDependency time.Duration + watchAllNamespaces bool + helmIndexLimit int64 + helmChartLimit int64 + helmChartFileLimit int64 + clientOptions client.Options + logOptions logger.Options + leaderElectionOptions leaderelection.Options + helmCacheMaxSize int + helmCacheTTL string + helmCachePurgeInterval string ) flag.StringVar(&metricsAddr, "metrics-addr", envOrDefault("METRICS_ADDR", ":8080"), @@ -110,6 +114,12 @@ func main() { "The max allowed size in bytes of a file in a Helm chart.") flag.DurationVar(&requeueDependency, "requeue-dependency", 30*time.Second, "The interval at which failing dependencies are reevaluated.") + flag.IntVar(&helmCacheMaxSize, "helm-cache-max-size", 0, + "The maximum size of the cache in number of indexes.") + flag.StringVar(&helmCacheTTL, "helm-cache-ttl", "15m", + "The TTL of an index in the cache. Valid time units are ns, us (or µs), ms, s, m, h.") + flag.StringVar(&helmCachePurgeInterval, "helm-cache-purge-interval", "1m", + "The interval at which the cache is purged. Valid time units are ns, us (or µs), ms, s, m, h.") clientOptions.BindFlags(flag.CommandLine) logOptions.BindFlags(flag.CommandLine) @@ -191,6 +201,24 @@ func main() { setupLog.Error(err, "unable to create controller", "controller", sourcev1.HelmRepositoryKind) os.Exit(1) } + + var c *cache.Cache + var ttl time.Duration + if helmCacheMaxSize > 0 { + interval, err := time.ParseDuration(helmCachePurgeInterval) + if err != nil { + setupLog.Error(err, "unable to parse cache purge interval") + os.Exit(1) + } + + ttl, err = time.ParseDuration(helmCacheTTL) + if err != nil { + setupLog.Error(err, "unable to parse cache TTL") + os.Exit(1) + } + + c = cache.New(helmCacheMaxSize, interval) + } if err = (&controllers.HelmChartReconciler{ Client: mgr.GetClient(), Storage: storage, @@ -198,6 +226,8 @@ func main() { EventRecorder: eventRecorder, Metrics: metricsH, ControllerName: controllerName, + Cache: c, + TTL: ttl, }).SetupWithManagerAndOptions(mgr, controllers.HelmChartReconcilerOptions{ MaxConcurrentReconciles: concurrent, }); err != nil {