diff --git a/api/v1beta2/condition_types.go b/api/v1beta2/condition_types.go index 2611cf257..b58c4aaa0 100644 --- a/api/v1beta2/condition_types.go +++ b/api/v1beta2/condition_types.go @@ -80,4 +80,7 @@ const ( // SymlinkUpdateFailedReason signals a failure in updating a symlink. SymlinkUpdateFailedReason string = "SymlinkUpdateFailed" + + // CacheOperationFailedReason signals a failure in cache operation. + CacheOperationFailedReason string = "CacheOperationFailed" ) diff --git a/controllers/.DS_Store b/controllers/.DS_Store new file mode 100644 index 000000000..b6c2cc44c Binary files /dev/null and b/controllers/.DS_Store differ diff --git a/controllers/helmchart_controller.go b/controllers/helmchart_controller.go index d6c2e4e11..35574e576 100644 --- a/controllers/helmchart_controller.go +++ b/controllers/helmchart_controller.go @@ -30,6 +30,7 @@ import ( securejoin "github.com/cyphar/filepath-securejoin" helmgetter "helm.sh/helm/v3/pkg/getter" + helmrepo "helm.sh/helm/v3/pkg/repo" corev1 "k8s.io/api/core/v1" apierrs "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -55,6 +56,7 @@ import ( "github.com/fluxcd/pkg/untar" sourcev1 "github.com/fluxcd/source-controller/api/v1beta2" + "github.com/fluxcd/source-controller/internal/cache" serror "github.com/fluxcd/source-controller/internal/error" "github.com/fluxcd/source-controller/internal/helm/chart" "github.com/fluxcd/source-controller/internal/helm/getter" @@ -109,6 +111,9 @@ type HelmChartReconciler struct { Storage *Storage Getters helmgetter.Providers ControllerName string + + Cache *cache.Cache + TTL time.Duration } func (r *HelmChartReconciler) SetupWithManager(mgr ctrl.Manager) error { @@ -451,6 +456,15 @@ func (r *HelmChartReconciler) buildFromHelmRepository(ctx context.Context, obj * } } + // Try to retrieve the repository index from the cache + if r.Cache != nil { + if index, found := r.Cache.Get(r.Storage.LocalPath(*repo.GetArtifact())); err == nil { + if found { + chartRepo.Index = index.(*helmrepo.IndexFile) + } + } + } + // Construct the chart builder with scoped configuration cb := chart.NewRemoteBuilder(chartRepo) opts := chart.BuildOptions{ @@ -474,6 +488,23 @@ func (r *HelmChartReconciler) buildFromHelmRepository(ctx context.Context, obj * return sreconcile.ResultEmpty, err } + defer func() { + // Cache the index if it was successfully retrieved + // and the chart was successfully built + if r.Cache != nil && chartRepo.Index != nil { + err := r.Cache.Set(r.Storage.LocalPath(*repo.GetArtifact()), chartRepo.Index, r.TTL) + if err != nil { + r.eventLogf(ctx, obj, events.EventTypeTrace, sourcev1.CacheOperationFailedReason, "failed to cache index: %v", err) + } + + } + + // Delete the index reference + if chartRepo.Index != nil { + chartRepo.Unload() + } + }() + *b = *build return sreconcile.ResultSuccess, nil } diff --git a/controllers/suite_test.go b/controllers/suite_test.go index a585eeddd..d61015b91 100644 --- a/controllers/suite_test.go +++ b/controllers/suite_test.go @@ -35,6 +35,7 @@ import ( "github.com/fluxcd/pkg/testserver" sourcev1 "github.com/fluxcd/source-controller/api/v1beta2" + "github.com/fluxcd/source-controller/internal/cache" // +kubebuilder:scaffold:imports ) @@ -126,12 +127,15 @@ func TestMain(m *testing.M) { panic(fmt.Sprintf("Failed to start HelmRepositoryReconciler: %v", err)) } + cache := cache.New(5, 1*time.Second) if err := (&HelmChartReconciler{ Client: testEnv, EventRecorder: record.NewFakeRecorder(32), Metrics: testMetricsH, Getters: testGetters, Storage: testStorage, + Cache: cache, + TTL: 1 * time.Second, }).SetupWithManager(testEnv); err != nil { panic(fmt.Sprintf("Failed to start HelmRepositoryReconciler: %v", err)) } diff --git a/internal/cache/cache.go b/internal/cache/cache.go new file mode 100644 index 000000000..eae74de0f --- /dev/null +++ b/internal/cache/cache.go @@ -0,0 +1,218 @@ +package cache + +import ( + "fmt" + "runtime" + "sync" + "time" +) + +// NOTE: this is heavily based on patrickmn/go-cache: +// https://github.com/patrickmn/go-cache + +// Cache is a thread-safe in-memory key/value store. +type Cache struct { + *cache +} + +// Item is an item stored in the cache. +type Item struct { + Object interface{} + Expiration int64 +} + +type cache struct { + // Items holds the elements in the cache. + Items map[string]Item + // Maximum number of items the cache can hold. + MaxItems int + mu sync.RWMutex + janitor *janitor +} + +// ItemCount returns the number of items in the cache. +// This may include items that have expired, but have not yet been cleaned up. +func (c *cache) ItemCount() int { + c.mu.RLock() + n := len(c.Items) + c.mu.RUnlock() + return n +} + +func (c *cache) set(key string, value interface{}, expiration time.Duration) { + var e int64 + if expiration > 0 { + e = time.Now().Add(expiration).UnixNano() + } + + c.Items[key] = Item{ + Object: value, + Expiration: e, + } +} + +// Set adds an item to the cache, replacing any existing item. +// If expiration is zero, the item never expires. +// If the cache is full, Set will return an error. +func (c *cache) Set(key string, value interface{}, expiration time.Duration) error { + c.mu.Lock() + _, found := c.Items[key] + if found { + c.set(key, value, expiration) + c.mu.Unlock() + return nil + } + + if c.MaxItems > 0 && len(c.Items) < c.MaxItems { + c.set(key, value, expiration) + c.mu.Unlock() + return nil + } + + c.mu.Unlock() + return fmt.Errorf("Cache is full") +} + +func (c *cache) Add(key string, value interface{}, expiration time.Duration) error { + c.mu.Lock() + _, found := c.Items[key] + if found { + c.mu.Unlock() + return fmt.Errorf("Item %s already exists", key) + } + + if c.MaxItems > 0 && len(c.Items) < c.MaxItems { + c.set(key, value, expiration) + c.mu.Unlock() + return nil + } + + c.mu.Unlock() + return fmt.Errorf("Cache is full") +} + +func (c *cache) Get(key string) (interface{}, bool) { + c.mu.RLock() + item, found := c.Items[key] + if !found { + c.mu.RUnlock() + return nil, false + } + if item.Expiration > 0 { + if item.Expiration < time.Now().UnixNano() { + c.mu.RUnlock() + return nil, false + } + } + c.mu.RUnlock() + return item.Object, true +} + +func (c *cache) Delete(key string) { + c.mu.Lock() + delete(c.Items, key) + c.mu.Unlock() +} + +func (c *cache) Clear() { + c.mu.Lock() + c.Items = make(map[string]Item) + c.mu.Unlock() +} + +func (c *cache) HasExpired(key string) bool { + c.mu.RLock() + item, ok := c.Items[key] + if !ok { + c.mu.RUnlock() + return true + } + if item.Expiration > 0 { + if item.Expiration < time.Now().UnixNano() { + c.mu.RUnlock() + return true + } + } + c.mu.RUnlock() + return false +} + +func (c *cache) SetExpiration(key string, expiration time.Duration) { + c.mu.Lock() + item, ok := c.Items[key] + if !ok { + c.mu.Unlock() + return + } + item.Expiration = time.Now().Add(expiration).UnixNano() + c.mu.Unlock() +} + +func (c *cache) GetExpiration(key string) time.Duration { + c.mu.RLock() + item, ok := c.Items[key] + if !ok { + c.mu.RUnlock() + return 0 + } + if item.Expiration > 0 { + if item.Expiration < time.Now().UnixNano() { + c.mu.RUnlock() + return 0 + } + } + c.mu.RUnlock() + return time.Duration(item.Expiration - time.Now().UnixNano()) +} + +func (c *cache) DeleteExpired() { + c.mu.Lock() + for k, v := range c.Items { + if v.Expiration > 0 && v.Expiration < time.Now().UnixNano() { + delete(c.Items, k) + } + } + c.mu.Unlock() +} + +type janitor struct { + Interval time.Duration + stop chan bool +} + +func (j *janitor) Run(c *cache) { + ticker := time.NewTicker(j.Interval) + for { + select { + case <-ticker.C: + c.DeleteExpired() + case <-j.stop: + ticker.Stop() + return + } + } +} + +func stopJanitor(c *Cache) { + c.janitor.stop <- true +} + +func New(maxItems int, interval time.Duration) *Cache { + c := &cache{ + Items: make(map[string]Item), + MaxItems: maxItems, + janitor: &janitor{ + Interval: interval, + stop: make(chan bool), + }, + } + + C := &Cache{c} + + if interval > 0 { + go c.janitor.Run(c) + runtime.SetFinalizer(C, stopJanitor) + } + + return C +} diff --git a/internal/cache/cache_test.go b/internal/cache/cache_test.go new file mode 100644 index 000000000..d737b5384 --- /dev/null +++ b/internal/cache/cache_test.go @@ -0,0 +1,71 @@ +package cache + +import ( + "testing" + "time" + + . "github.com/onsi/gomega" +) + +func TestCache(t *testing.T) { + g := NewWithT(t) + // create a cache that can hold 2 items and have no cleanup + cache := New(2, 0) + + // Get an Item from the cache + if _, found := cache.Get("key1"); found { + t.Error("Item should not be found") + } + + // Add an item to the cache + err := cache.Add("key1", "value1", 0) + g.Expect(err).ToNot(HaveOccurred()) + + // Get the item from the cache + item, found := cache.Get("key1") + g.Expect(found).To(BeTrue()) + g.Expect(item).To(Equal("value1")) + + // Add another item to the cache + err = cache.Add("key2", "value2", 0) + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(cache.ItemCount()).To(Equal(2)) + + // Get the item from the cache + item, found = cache.Get("key2") + g.Expect(found).To(BeTrue()) + g.Expect(item).To(Equal("value2")) + + //Add an item to the cache + err = cache.Add("key3", "value3", 0) + g.Expect(err).To(HaveOccurred()) + + // Replace an item in the cache + err = cache.Set("key2", "value3", 0) + g.Expect(err).ToNot(HaveOccurred()) + + // Get the item from the cache + item, found = cache.Get("key2") + g.Expect(found).To(BeTrue()) + g.Expect(item).To(Equal("value3")) + + // new cache with a cleanup interval of 1 second + cache = New(2, 1*time.Second) + + // Add an item to the cache + err = cache.Add("key1", "value1", 2*time.Second) + g.Expect(err).ToNot(HaveOccurred()) + + // Get the item from the cache + item, found = cache.Get("key1") + g.Expect(found).To(BeTrue()) + g.Expect(item).To(Equal("value1")) + + // wait for the item to expire + time.Sleep(3 * time.Second) + + // Get the item from the cache + item, found = cache.Get("key1") + g.Expect(found).To(BeFalse()) + g.Expect(item).To(BeNil()) +} diff --git a/internal/helm/chart/builder_remote.go b/internal/helm/chart/builder_remote.go index 778efd253..b3594cefb 100644 --- a/internal/helm/chart/builder_remote.go +++ b/internal/helm/chart/builder_remote.go @@ -72,11 +72,13 @@ func (b *remoteChartBuilder) Build(_ context.Context, ref Reference, p string, o return nil, &BuildError{Reason: ErrChartReference, Err: err} } - if err := b.remote.LoadFromCache(); err != nil { - err = fmt.Errorf("could not load repository index for remote chart reference: %w", err) - return nil, &BuildError{Reason: ErrChartPull, Err: err} + // Load the repository index if not already present. + if b.remote.Index == nil { + if err := b.remote.LoadFromCache(); err != nil { + err = fmt.Errorf("could not load repository index for remote chart reference: %w", err) + return nil, &BuildError{Reason: ErrChartPull, Err: err} + } } - defer b.remote.Unload() // Get the current version for the RemoteReference cv, err := b.remote.Get(remoteRef.Name, remoteRef.Version) diff --git a/main.go b/main.go index 120c83d5d..53a19ea66 100644 --- a/main.go +++ b/main.go @@ -44,6 +44,7 @@ import ( sourcev1 "github.com/fluxcd/source-controller/api/v1beta2" "github.com/fluxcd/source-controller/controllers" + "github.com/fluxcd/source-controller/internal/cache" "github.com/fluxcd/source-controller/internal/helm" "github.com/fluxcd/source-controller/pkg/git/libgit2/managed" // +kubebuilder:scaffold:imports @@ -86,6 +87,9 @@ func main() { clientOptions client.Options logOptions logger.Options leaderElectionOptions leaderelection.Options + cacheMaxSize int + cacheTTL string + cachePurgeInterval string ) flag.StringVar(&metricsAddr, "metrics-addr", envOrDefault("METRICS_ADDR", ":8080"), @@ -110,6 +114,12 @@ func main() { "The max allowed size in bytes of a file in a Helm chart.") flag.DurationVar(&requeueDependency, "requeue-dependency", 30*time.Second, "The interval at which failing dependencies are reevaluated.") + flag.IntVar(&cacheMaxSize, "cache-max-size", 0, + "The maximum size of the cache in number of items.") + flag.StringVar(&cacheTTL, "cache-ttl", "15m", + "The TTL of an item in the cache. Valid time units are ns, us (or µs), ms, s, m, h.") + flag.StringVar(&cachePurgeInterval, "cache-purge-interval", "1m", + "The interval at which the cache is purged. Valid time units are ns, us (or µs), ms, s, m, h.") clientOptions.BindFlags(flag.CommandLine) logOptions.BindFlags(flag.CommandLine) @@ -191,6 +201,24 @@ func main() { setupLog.Error(err, "unable to create controller", "controller", sourcev1.HelmRepositoryKind) os.Exit(1) } + + var c *cache.Cache + var ttl time.Duration + if cacheMaxSize > 0 { + interval, err := time.ParseDuration(cachePurgeInterval) + if err != nil { + setupLog.Error(err, "unable to parse cache purge interval") + os.Exit(1) + } + + ttl, err = time.ParseDuration(cacheTTL) + if err != nil { + setupLog.Error(err, "unable to parse cache TTL") + os.Exit(1) + } + + c = cache.New(cacheMaxSize, interval) + } if err = (&controllers.HelmChartReconciler{ Client: mgr.GetClient(), Storage: storage, @@ -198,6 +226,8 @@ func main() { EventRecorder: eventRecorder, Metrics: metricsH, ControllerName: controllerName, + Cache: c, + TTL: ttl, }).SetupWithManagerAndOptions(mgr, controllers.HelmChartReconcilerOptions{ MaxConcurrentReconciles: concurrent, }); err != nil {