Skip to content
This repository has been archived by the owner on Nov 1, 2022. It is now read-only.

Split Warmer.warm() into smaller functions #1935

Merged
merged 2 commits into from
Apr 15, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
268 changes: 268 additions & 0 deletions registry/cache/repocachemanager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,268 @@
package cache

import (
"context"
"encoding/json"
"fmt"
"net"
"strings"
"sync"
"time"

"github.com/go-kit/kit/log"
"github.com/pkg/errors"

"github.com/weaveworks/flux/image"
"github.com/weaveworks/flux/registry"
)

type imageToUpdate struct {
ref image.Ref
previousDigest string
previousRefresh time.Duration
}

// repoCacheManager handles cache operations for a container image repository
type repoCacheManager struct {
now time.Time
repoID image.Name
burst int
trace bool
logger log.Logger
cacheClient Client
sync.Mutex
}

func newRepoCacheManager(now time.Time, repoId image.Name, burst int, trace bool, logger log.Logger,
cacheClient Client) *repoCacheManager {
return &repoCacheManager{
now: now,
repoID: repoId,
burst: burst,
trace: trace,
logger: logger,
cacheClient: cacheClient,
}
}

// fetchRepository fetches the repository from the cache
func (c *repoCacheManager) fetchRepository() (ImageRepository, error) {
var result ImageRepository
repoKey := NewRepositoryKey(c.repoID.CanonicalName())
bytes, _, err := c.cacheClient.GetKey(repoKey)
if err != nil {
return ImageRepository{}, err
}
if err = json.Unmarshal(bytes, &result); err != nil {
return ImageRepository{}, err
}
return result, nil
}

// storeRepository stores the repository from the cache
func (c *repoCacheManager) storeRepository(repo ImageRepository) error {
repoKey := NewRepositoryKey(c.repoID.CanonicalName())
bytes, err := json.Marshal(repo)
if err != nil {
return err
}
return c.cacheClient.SetKey(repoKey, c.now.Add(repoRefresh), bytes)
}

// fetchImagesResult is the result of fetching images from the cache
// invariant: len(imagesToUpdate) == imagesToUpdateRefreshCount + imagesToUpdateMissingCount
type fetchImagesResult struct {
imagesFound map[string]image.Info // images found in the cache
imagesToUpdate []imageToUpdate // images which need to be updated
imagesToUpdateRefreshCount int // number of imagesToUpdate which need updating due to their cache entry expiring
imagesToUpdateMissingCount int // number of imagesToUpdate which need updating due to being missing
}

// fetchImages attemps to fetch the images with the provided tags from the cache.
// It returns the images found, those which require updating and details about
// why they need to be updated.
func (c *repoCacheManager) fetchImages(tags []string) (fetchImagesResult, error) {
images := map[string]image.Info{}

// Create a list of images that need updating
var toUpdate []imageToUpdate

// Counters for reporting what happened
var missing, refresh int
for _, tag := range tags {
if tag == "" {
return fetchImagesResult{}, fmt.Errorf("empty tag in fetched tags")
}

// See if we have the manifest already cached
newID := c.repoID.ToRef(tag)
key := NewManifestKey(newID.CanonicalRef())
bytes, deadline, err := c.cacheClient.GetKey(key)
// If err, then we don't have it yet. Update.
switch {
case err != nil: // by and large these are cache misses, but any error shall count as "not found"
if err != ErrNotCached {
c.logger.Log("warning", "error from cache", "err", err, "ref", newID)
}
missing++
toUpdate = append(toUpdate, imageToUpdate{ref: newID, previousRefresh: initialRefresh})
case len(bytes) == 0:
c.logger.Log("warning", "empty result from cache", "ref", newID)
missing++
toUpdate = append(toUpdate, imageToUpdate{ref: newID, previousRefresh: initialRefresh})
default:
var entry registry.ImageEntry
if err := json.Unmarshal(bytes, &entry); err == nil {
if c.trace {
c.logger.Log("trace", "found cached manifest", "ref", newID, "last_fetched", entry.LastFetched.Format(time.RFC3339), "deadline", deadline.Format(time.RFC3339))
}

if entry.ExcludedReason == "" {
images[tag] = entry.Info
if c.now.After(deadline) {
previousRefresh := minRefresh
lastFetched := entry.Info.LastFetched
if !lastFetched.IsZero() {
previousRefresh = deadline.Sub(lastFetched)
}
toUpdate = append(toUpdate, imageToUpdate{ref: newID, previousRefresh: previousRefresh, previousDigest: entry.Info.Digest})
refresh++
}
} else {
if c.trace {
c.logger.Log("trace", "excluded in cache", "ref", newID, "reason", entry.ExcludedReason)
}
if c.now.After(deadline) {
toUpdate = append(toUpdate, imageToUpdate{ref: newID, previousRefresh: excludedRefresh})
refresh++
}
}
}
}
}

result := fetchImagesResult{
imagesFound: images,
imagesToUpdate: toUpdate,
imagesToUpdateRefreshCount: refresh,
imagesToUpdateMissingCount: missing,
}

return result, nil
}

// updateImages, refreshes the cache entries for the images passed. It may not succeed for all images.
// It returns the values stored in cache, the number of images it succeeded for and the number
// of images whose manifest wasn't found in the registry.
func (c *repoCacheManager) updateImages(ctx context.Context, registryClient registry.Client, images []imageToUpdate) (map[string]image.Info, int, int) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a note; I am not a fan of the .., int, int return, but wrapping it in a struct to give it more context does not seem very meaningful either, the comment makes up for it a bit though.

Copy link
Contributor Author

@2opremio 2opremio Apr 15, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I am not too pleased with this or fetchImagesResult either

Happy to look into any suggestions you may have.

Maybe computing the counters outside of the functions? I gave that a short chance but it was hairy. I will give it another try.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't find a cleaner way. At least not with a much bigger refactoring.

// The upper bound for concurrent fetches against a single host is
// w.Burst, so limit the number of fetching goroutines to that.
fetchers := make(chan struct{}, c.burst)
awaitFetchers := &sync.WaitGroup{}

ctxc, cancel := context.WithCancel(ctx)
defer cancel()

var successCount int
var manifestUnknownCount int
var result = map[string]image.Info{}
var warnAboutRateLimit sync.Once
updates:
for _, up := range images {
// to avoid race condition, when accessing it in the go routine
upCopy := up
select {
case <-ctxc.Done():
break updates
case fetchers <- struct{}{}:
}
awaitFetchers.Add(1)
go func() {
defer func() { awaitFetchers.Done(); <-fetchers }()
entry, err := c.updateImage(ctxc, registryClient, upCopy)
if err != nil {
if err, ok := errors.Cause(err).(net.Error); ok && err.Timeout() {
// This was due to a context timeout, don't bother logging
hiddeco marked this conversation as resolved.
Show resolved Hide resolved
return
}
switch {
case strings.Contains(err.Error(), "429"):
// abort the image tags fetching if we've been rate limited
warnAboutRateLimit.Do(func() {
c.logger.Log("warn", "aborting image tag fetching due to rate limiting, will try again later")
cancel()
})
case strings.Contains(err.Error(), "manifest unknown"):
// Registry is corrupted, keep going, this manifest may not be relevant for automatic updates
c.Lock()
manifestUnknownCount++
c.Unlock()
c.logger.Log("warn", fmt.Sprintf("manifest for tag %s missing in repository %s", up.ref.Tag, up.ref.Name),
"impact", "flux will fail to auto-release workloads with matching images, ask the repository administrator to fix the inconsistency")
default:
c.logger.Log("err", err, "ref", up.ref)
}
return
}
c.Lock()
successCount++
if entry.ExcludedReason == "" {
result[upCopy.ref.Tag] = entry.Info
}
c.Unlock()
}()
}
awaitFetchers.Wait()
return result, successCount, manifestUnknownCount
}

func (c *repoCacheManager) updateImage(ctx context.Context, registryClient registry.Client, update imageToUpdate) (registry.ImageEntry, error) {
imageID := update.ref

if c.trace {
c.logger.Log("trace", "refreshing manifest", "ref", imageID, "previous_refresh", update.previousRefresh.String())
}

// Get the image from the remote
entry, err := registryClient.Manifest(ctx, imageID.Tag)
if err != nil {
return registry.ImageEntry{}, err
}

refresh := update.previousRefresh
reason := ""
switch {
case entry.ExcludedReason != "":
c.logger.Log("excluded", entry.ExcludedReason, "ref", imageID)
refresh = excludedRefresh
reason = "image is excluded"
case update.previousDigest == "":
entry.Info.LastFetched = c.now
refresh = update.previousRefresh
reason = "no prior cache entry for image"
case entry.Info.Digest == update.previousDigest:
entry.Info.LastFetched = c.now
refresh = clipRefresh(refresh * 2)
reason = "image digest is same"
default: // i.e., not excluded, but the digests differ -> the tag was moved
entry.Info.LastFetched = c.now
refresh = clipRefresh(refresh / 2)
reason = "image digest is different"
}

if c.trace {
c.logger.Log("trace", "caching manifest", "ref", imageID, "last_fetched", c.now.Format(time.RFC3339), "refresh", refresh.String(), "reason", reason)
}

key := NewManifestKey(imageID.CanonicalRef())
// Write back to memcached
val, err := json.Marshal(entry)
if err != nil {
return registry.ImageEntry{}, err
}
err = c.cacheClient.SetKey(key, c.now.Add(refresh), val)
if err != nil {
return registry.ImageEntry{}, err
}
return entry, nil
}
Loading