Skip to content

Commit

Permalink
Cache HelmRepository index files
Browse files Browse the repository at this point in the history
If implemented, will provide users with a way to cache index files.

This addresses issues where the index file is loaded and unmarshalled in
concurrent reconciliation resulting in a heavy memory footprint.

The caching strategy used is cache aside, and the cache is a k/v store
with expiration.

The cache number of entries and ttl for entries are configurable.

The cache is optional and is disabled by default

Signed-off-by: Soule BA <soule@weave.works>
  • Loading branch information
souleb committed Mar 21, 2022
1 parent 62b5007 commit b36c837
Show file tree
Hide file tree
Showing 7 changed files with 398 additions and 4 deletions.
3 changes: 3 additions & 0 deletions api/v1beta2/condition_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,4 +85,7 @@ const (

// SymlinkUpdateFailedReason signals a failure in updating a symlink.
SymlinkUpdateFailedReason string = "SymlinkUpdateFailed"

// CacheOperationFailedReason signals a failure in cache operation.
CacheOperationFailedReason string = "CacheOperationFailed"
)
34 changes: 34 additions & 0 deletions controllers/helmchart_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (

securejoin "github.com/cyphar/filepath-securejoin"
helmgetter "helm.sh/helm/v3/pkg/getter"
helmrepo "helm.sh/helm/v3/pkg/repo"
corev1 "k8s.io/api/core/v1"
apierrs "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -55,6 +56,7 @@ import (
"github.com/fluxcd/pkg/untar"

sourcev1 "github.com/fluxcd/source-controller/api/v1beta2"
"github.com/fluxcd/source-controller/internal/cache"
serror "github.com/fluxcd/source-controller/internal/error"
"github.com/fluxcd/source-controller/internal/helm/chart"
"github.com/fluxcd/source-controller/internal/helm/getter"
Expand Down Expand Up @@ -109,6 +111,9 @@ type HelmChartReconciler struct {
Storage *Storage
Getters helmgetter.Providers
ControllerName string

Cache *cache.Cache
TTL time.Duration
}

func (r *HelmChartReconciler) SetupWithManager(mgr ctrl.Manager) error {
Expand Down Expand Up @@ -451,6 +456,15 @@ func (r *HelmChartReconciler) buildFromHelmRepository(ctx context.Context, obj *
}
}

// Try to retrieve the repository index from the cache
if r.Cache != nil {
if index, found := r.Cache.Get(r.Storage.LocalPath(*repo.GetArtifact())); err == nil {
if found {
chartRepo.Index = index.(*helmrepo.IndexFile)
}
}
}

// Construct the chart builder with scoped configuration
cb := chart.NewRemoteBuilder(chartRepo)
opts := chart.BuildOptions{
Expand All @@ -474,6 +488,26 @@ func (r *HelmChartReconciler) buildFromHelmRepository(ctx context.Context, obj *
return sreconcile.ResultEmpty, err
}

defer func() {
// Cache the index if it was successfully retrieved
// and the chart was successfully built
if r.Cache != nil && chartRepo.Index != nil {
// The cache key have to be safe in multi-tenancy environments,
// as otherwise it could be used as a vector to bypass the helm repository's authentication.
// Using r.Storage.LocalPath(*repo.GetArtifact() is safe as the path is in the format /<helm-repository-name>/<chart-name>/<filename>.
err := r.Cache.Set(r.Storage.LocalPath(*repo.GetArtifact()), chartRepo.Index, r.TTL)
if err != nil {
r.eventLogf(ctx, obj, events.EventTypeTrace, sourcev1.CacheOperationFailedReason, "failed to cache index: %v", err)
}

}

// Delete the index reference
if chartRepo.Index != nil {
chartRepo.Unload()
}
}()

*b = *build
return sreconcile.ResultSuccess, nil
}
Expand Down
4 changes: 4 additions & 0 deletions controllers/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/fluxcd/pkg/testserver"

sourcev1 "github.com/fluxcd/source-controller/api/v1beta2"
"github.com/fluxcd/source-controller/internal/cache"
// +kubebuilder:scaffold:imports
)

Expand Down Expand Up @@ -126,12 +127,15 @@ func TestMain(m *testing.M) {
panic(fmt.Sprintf("Failed to start HelmRepositoryReconciler: %v", err))
}

cache := cache.New(5, 1*time.Second)
if err := (&HelmChartReconciler{
Client: testEnv,
EventRecorder: record.NewFakeRecorder(32),
Metrics: testMetricsH,
Getters: testGetters,
Storage: testStorage,
Cache: cache,
TTL: 1 * time.Second,
}).SetupWithManager(testEnv); err != nil {
panic(fmt.Sprintf("Failed to start HelmRepositoryReconciler: %v", err))
}
Expand Down
234 changes: 234 additions & 0 deletions internal/cache/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,234 @@
/*
Copyright 2022 The Flux authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package cache

import (
"fmt"
"runtime"
"sync"
"time"
)

// NOTE: this is heavily based on patrickmn/go-cache:
// https://github.com/patrickmn/go-cache

// Cache is a thread-safe in-memory key/value store.
type Cache struct {
*cache
}

// Item is an item stored in the cache.
type Item struct {
Object interface{}
Expiration int64
}

type cache struct {
// Items holds the elements in the cache.
Items map[string]Item
// Maximum number of items the cache can hold.
MaxItems int
mu sync.RWMutex
janitor *janitor
}

// ItemCount returns the number of items in the cache.
// This may include items that have expired, but have not yet been cleaned up.
func (c *cache) ItemCount() int {
c.mu.RLock()
n := len(c.Items)
c.mu.RUnlock()
return n
}

func (c *cache) set(key string, value interface{}, expiration time.Duration) {
var e int64
if expiration > 0 {
e = time.Now().Add(expiration).UnixNano()
}

c.Items[key] = Item{
Object: value,
Expiration: e,
}
}

// Set adds an item to the cache, replacing any existing item.
// If expiration is zero, the item never expires.
// If the cache is full, Set will return an error.
func (c *cache) Set(key string, value interface{}, expiration time.Duration) error {
c.mu.Lock()
_, found := c.Items[key]
if found {
c.set(key, value, expiration)
c.mu.Unlock()
return nil
}

if c.MaxItems > 0 && len(c.Items) < c.MaxItems {
c.set(key, value, expiration)
c.mu.Unlock()
return nil
}

c.mu.Unlock()
return fmt.Errorf("Cache is full")
}

func (c *cache) Add(key string, value interface{}, expiration time.Duration) error {
c.mu.Lock()
_, found := c.Items[key]
if found {
c.mu.Unlock()
return fmt.Errorf("Item %s already exists", key)
}

if c.MaxItems > 0 && len(c.Items) < c.MaxItems {
c.set(key, value, expiration)
c.mu.Unlock()
return nil
}

c.mu.Unlock()
return fmt.Errorf("Cache is full")
}

func (c *cache) Get(key string) (interface{}, bool) {
c.mu.RLock()
item, found := c.Items[key]
if !found {
c.mu.RUnlock()
return nil, false
}
if item.Expiration > 0 {
if item.Expiration < time.Now().UnixNano() {
c.mu.RUnlock()
return nil, false
}
}
c.mu.RUnlock()
return item.Object, true
}

func (c *cache) Delete(key string) {
c.mu.Lock()
delete(c.Items, key)
c.mu.Unlock()
}

func (c *cache) Clear() {
c.mu.Lock()
c.Items = make(map[string]Item)
c.mu.Unlock()
}

func (c *cache) HasExpired(key string) bool {
c.mu.RLock()
item, ok := c.Items[key]
if !ok {
c.mu.RUnlock()
return true
}
if item.Expiration > 0 {
if item.Expiration < time.Now().UnixNano() {
c.mu.RUnlock()
return true
}
}
c.mu.RUnlock()
return false
}

func (c *cache) SetExpiration(key string, expiration time.Duration) {
c.mu.Lock()
item, ok := c.Items[key]
if !ok {
c.mu.Unlock()
return
}
item.Expiration = time.Now().Add(expiration).UnixNano()
c.mu.Unlock()
}

func (c *cache) GetExpiration(key string) time.Duration {
c.mu.RLock()
item, ok := c.Items[key]
if !ok {
c.mu.RUnlock()
return 0
}
if item.Expiration > 0 {
if item.Expiration < time.Now().UnixNano() {
c.mu.RUnlock()
return 0
}
}
c.mu.RUnlock()
return time.Duration(item.Expiration - time.Now().UnixNano())
}

func (c *cache) DeleteExpired() {
c.mu.Lock()
for k, v := range c.Items {
if v.Expiration > 0 && v.Expiration < time.Now().UnixNano() {
delete(c.Items, k)
}
}
c.mu.Unlock()
}

type janitor struct {
Interval time.Duration
stop chan bool
}

func (j *janitor) Run(c *cache) {
ticker := time.NewTicker(j.Interval)
for {
select {
case <-ticker.C:
c.DeleteExpired()
case <-j.stop:
ticker.Stop()
return
}
}
}

func stopJanitor(c *Cache) {
c.janitor.stop <- true
}

func New(maxItems int, interval time.Duration) *Cache {
c := &cache{
Items: make(map[string]Item),
MaxItems: maxItems,
janitor: &janitor{
Interval: interval,
stop: make(chan bool),
},
}

C := &Cache{c}

if interval > 0 {
go c.janitor.Run(c)
runtime.SetFinalizer(C, stopJanitor)
}

return C
}
Loading

0 comments on commit b36c837

Please sign in to comment.