Skip to content
This repository has been archived by the owner on Nov 1, 2022. It is now read-only.

Commit

Permalink
Clarify which fields of cache.Warmer are required
Browse files Browse the repository at this point in the history
The cache.Warmer struct has some mandatory fields and some
optional. With a constructor we can better enforce this.

In particular, the cache client implementation, which is now mandatory
but was still described as optional in examples and help text.

We can simplify (and in examples, omit) the default values by assuming
memcached is in the same namespace.
  • Loading branch information
squaremo committed Dec 18, 2017
1 parent 7fc6f25 commit 1a06c22
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 71 deletions.
39 changes: 18 additions & 21 deletions cmd/fluxd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func main() {

gitPollInterval = fs.Duration("git-poll-interval", 5*time.Minute, "period at which to poll git repo for new commits")
// registry
memcachedHostname = fs.String("memcached-hostname", "", "Hostname for memcached service to use when caching chunks. If empty, no memcached will be used.")
memcachedHostname = fs.String("memcached-hostname", "memcached", "Hostname for memcached service.")
memcachedTimeout = fs.Duration("memcached-timeout", time.Second, "Maximum time to wait before giving up on memcached requests.")
memcachedService = fs.String("memcached-service", "memcached", "SRV service used to discover memcache servers.")
registryCacheExpiry = fs.Duration("registry-cache-expiry", 1*time.Hour, "Duration to keep cached image info. Must be < 1 month.")
Expand Down Expand Up @@ -235,19 +235,17 @@ func main() {
{
// Cache client, for use by registry and cache warmer
var cacheClient cache.Client
if *memcachedHostname != "" {
memcacheClient := registryMemcache.NewMemcacheClient(registryMemcache.MemcacheConfig{
Host: *memcachedHostname,
Service: *memcachedService,
Expiry: *registryCacheExpiry,
Timeout: *memcachedTimeout,
UpdateInterval: 1 * time.Minute,
Logger: log.With(logger, "component", "memcached"),
MaxIdleConns: *registryBurst,
})
defer memcacheClient.Stop()
cacheClient = cache.InstrumentClient(memcacheClient)
}
memcacheClient := registryMemcache.NewMemcacheClient(registryMemcache.MemcacheConfig{
Host: *memcachedHostname,
Service: *memcachedService,
Expiry: *registryCacheExpiry,
Timeout: *memcachedTimeout,
UpdateInterval: 1 * time.Minute,
Logger: log.With(logger, "component", "memcached"),
MaxIdleConns: *registryBurst,
})
defer memcacheClient.Stop()
cacheClient = cache.InstrumentClient(memcacheClient)

cacheRegistry = &cache.Cache{
Reader: cacheClient,
Expand All @@ -266,12 +264,11 @@ func main() {
}

// Warmer
warmerLogger := log.With(logger, "component", "warmer")
cacheWarmer = &cache.Warmer{
Logger: warmerLogger,
ClientFactory: remoteFactory,
Cache: cacheClient,
Burst: *registryBurst,
var err error
cacheWarmer, err = cache.NewWarmer(remoteFactory, cacheClient, *registryBurst)
if err != nil {
logger.Log("err", err)
os.Exit(1)
}
}

Expand Down Expand Up @@ -446,7 +443,7 @@ func main() {
cacheWarmer.Notify = daemon.AskForImagePoll
cacheWarmer.Priority = daemon.ImageRefresh
shutdownWg.Add(1)
go cacheWarmer.Loop(shutdown, shutdownWg, image_creds)
go cacheWarmer.Loop(log.With(logger, "component", "warmer"), shutdown, shutdownWg, image_creds)

// Update daemonRef so that upstream and handlers point to fully working daemon
daemonRef.UpdatePlatform(daemon)
Expand Down
17 changes: 7 additions & 10 deletions deploy/flux-deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,20 +32,17 @@ spec:
- name: git-key
mountPath: /etc/fluxd/ssh
args:
# if you deployed memcached, you can supply these arguments to
# tell fluxd to use it. You may need to change the namespace
# (`default`) if you run fluxd in another namespace.
- --memcached-hostname=memcached.default.svc.cluster.local
- --memcached-timeout=100ms
- --memcached-service=memcached
- --registry-cache-expiry=20m

# if you deployed memcached in a different namespace to flux,
# or with a different service name, you can supply these
# following two arguments to tell fluxd how to connect to it.
# - --memcached-hostname=memcached.default.svc.cluster.local
# - --memcached-service=memcached

# replace (at least) the following URL
- --git-url=git@github.com:weaveworks/flux-example
- --git-branch=master
# include these next two to connect to an "upstream" service
# (e.g., Weave Cloud). The token is particular to the service.
# - --connect=wss://cloud.weave.works/api/flux
# - --token=abc123abc123abc123abc123
# override -b and -t arguments to ssh-keygen
# - --ssh-keygen-bits=2048
- --ssh-keygen-type=ed25519
10 changes: 2 additions & 8 deletions registry/cache/memcached/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,7 @@ func TestWarming_WarmerWriteCacheRead(t *testing.T) {

r := &cache.Cache{mc}

w := &cache.Warmer{
Logger: log.With(logger, "component", "warmer"),
ClientFactory: remote,
Cache: mc,
Burst: 125,
}

w, _ := cache.NewWarmer(remote, mc, 125)
shutdown := make(chan struct{})
shutdownWg := &sync.WaitGroup{}
defer func() {
Expand All @@ -57,7 +51,7 @@ func TestWarming_WarmerWriteCacheRead(t *testing.T) {
}()

shutdownWg.Add(1)
go w.Loop(shutdown, shutdownWg, func() registry.ImageCreds {
go w.Loop(log.With(logger, "component", "warmer"), shutdown, shutdownWg, func() registry.ImageCreds {
return registry.ImageCreds{
id.Name: registry.NoCredentials(),
}
Expand Down
68 changes: 38 additions & 30 deletions registry/cache/warming.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,26 @@ const askForNewImagesInterval = time.Minute
// Warmer refreshes the information kept in the cache from remote
// registries.
type Warmer struct {
Logger log.Logger
ClientFactory registry.ClientFactory
Cache Client
Burst int
clientFactory registry.ClientFactory
cache Client
burst int
Priority chan image.Name
Notify func()
}

// NewWarmer creates cache warmer that (when Loop is invoked) will
// periodically refresh the values kept in the cache.
func NewWarmer(cf registry.ClientFactory, cacheClient Client, burst int) (*Warmer, error) {
if cf == nil || cacheClient == nil || burst <= 0 {
return nil, errors.New("arguments must be non-nil (or > 0 in the case of burst)")
}
return &Warmer{
clientFactory: cf,
cache: cacheClient,
burst: burst,
}, nil
}

// .. and this is what we keep in the backlog
type backlogItem struct {
image.Name
Expand All @@ -36,13 +48,9 @@ type backlogItem struct {

// Continuously get the images to populate the cache with, and
// populate the cache with them.
func (w *Warmer) Loop(stop <-chan struct{}, wg *sync.WaitGroup, imagesToFetchFunc func() registry.ImageCreds) {
func (w *Warmer) Loop(logger log.Logger, stop <-chan struct{}, wg *sync.WaitGroup, imagesToFetchFunc func() registry.ImageCreds) {
defer wg.Done()

if w.Logger == nil || w.ClientFactory == nil || w.Cache == nil {
panic("registry.Warmer fields are nil")
}

refresh := time.Tick(askForNewImagesInterval)
imageCreds := imagesToFetchFunc()
backlog := imageCredsToBacklog(imageCreds)
Expand All @@ -61,17 +69,17 @@ func (w *Warmer) Loop(stop <-chan struct{}, wg *sync.WaitGroup, imagesToFetchFun
for {
select {
case <-stop:
w.Logger.Log("stopping", "true")
logger.Log("stopping", "true")
return
case name := <-w.Priority:
w.Logger.Log("priority", name.String())
logger.Log("priority", name.String())
// NB the implicit contract here is that the prioritised
// image has to have been running the last time we
// requested the credentials.
if creds, ok := imageCreds[name]; ok {
w.warm(ctx, name, creds)
w.warm(ctx, logger, name, creds)
} else {
w.Logger.Log("priority", name.String(), "err", "no creds available")
logger.Log("priority", name.String(), "err", "no creds available")
}
continue
default:
Expand All @@ -80,7 +88,7 @@ func (w *Warmer) Loop(stop <-chan struct{}, wg *sync.WaitGroup, imagesToFetchFun
if len(backlog) > 0 {
im := backlog[0]
backlog = backlog[1:]
w.warm(ctx, im.Name, im.Credentials)
w.warm(ctx, logger, im.Name, im.Credentials)
} else {
select {
case <-refresh:
Expand All @@ -102,25 +110,25 @@ func imageCredsToBacklog(imageCreds registry.ImageCreds) []backlogItem {
return backlog
}

func (w *Warmer) warm(ctx context.Context, id image.Name, creds registry.Credentials) {
client, err := w.ClientFactory.ClientFor(id.CanonicalName(), creds)
func (w *Warmer) warm(ctx context.Context, logger log.Logger, id image.Name, creds registry.Credentials) {
client, err := w.clientFactory.ClientFor(id.CanonicalName(), creds)
if err != nil {
w.Logger.Log("err", err.Error())
logger.Log("err", err.Error())
return
}

// This is what we're going to write back to the cache
var repo ImageRepository
repoKey := NewRepositoryKey(id.CanonicalName())
bytes, _, err := w.Cache.GetKey(repoKey)
bytes, _, err := w.cache.GetKey(repoKey)
if err == nil {
err = json.Unmarshal(bytes, &repo)
} else if err == ErrNotCached {
err = nil
}

if err != nil {
w.Logger.Log("err", errors.Wrap(err, "fetching previous result from cache"))
logger.Log("err", errors.Wrap(err, "fetching previous result from cache"))
return
}

Expand All @@ -133,17 +141,17 @@ func (w *Warmer) warm(ctx context.Context, id image.Name, creds registry.Credent
defer func() {
bytes, err := json.Marshal(repo)
if err == nil {
err = w.Cache.SetKey(repoKey, bytes)
err = w.cache.SetKey(repoKey, bytes)
}
if err != nil {
w.Logger.Log("err", errors.Wrap(err, "writing result to cache"))
logger.Log("err", errors.Wrap(err, "writing result to cache"))
}
}()

tags, err := client.Tags(ctx)
if err != nil {
if !strings.Contains(err.Error(), context.DeadlineExceeded.Error()) && !strings.Contains(err.Error(), "net/http: request canceled") {
w.Logger.Log("err", errors.Wrap(err, "requesting tags"))
logger.Log("err", errors.Wrap(err, "requesting tags"))
repo.LastError = err.Error()
}
return
Expand All @@ -158,7 +166,7 @@ func (w *Warmer) warm(ctx context.Context, id image.Name, creds registry.Credent
// See if we have the manifest already cached
newID := id.ToRef(tag)
key := NewManifestKey(newID.CanonicalRef())
bytes, expiry, err := w.Cache.GetKey(key)
bytes, expiry, err := w.cache.GetKey(key)
// If err, then we don't have it yet. Update.
switch {
case err != nil:
Expand All @@ -179,12 +187,12 @@ func (w *Warmer) warm(ctx context.Context, id image.Name, creds registry.Credent
var successCount int

if len(toUpdate) > 0 {
w.Logger.Log("fetching", id.String(), "total", len(toUpdate), "expired", expired, "missing", missing)
logger.Log("fetching", id.String(), "total", len(toUpdate), "expired", expired, "missing", missing)
var successMx sync.Mutex

// The upper bound for concurrent fetches against a single host is
// w.Burst, so limit the number of fetching goroutines to that.
fetchers := make(chan struct{}, w.Burst)
fetchers := make(chan struct{}, w.burst)
awaitFetchers := &sync.WaitGroup{}
updates:
for _, imID := range toUpdate {
Expand All @@ -204,20 +212,20 @@ func (w *Warmer) warm(ctx context.Context, id image.Name, creds registry.Credent
// This was due to a context timeout, don't bother logging
return
}
w.Logger.Log("err", errors.Wrap(err, "requesting manifests"))
logger.Log("err", errors.Wrap(err, "requesting manifests"))
return
}

key := NewManifestKey(img.ID.CanonicalRef())
// Write back to memcached
val, err := json.Marshal(img)
if err != nil {
w.Logger.Log("err", errors.Wrap(err, "serializing tag to store in cache"))
logger.Log("err", errors.Wrap(err, "serializing tag to store in cache"))
return
}
err = w.Cache.SetKey(key, val)
err = w.cache.SetKey(key, val)
if err != nil {
w.Logger.Log("err", errors.Wrap(err, "storing manifests in cache"))
logger.Log("err", errors.Wrap(err, "storing manifests in cache"))
return
}
successMx.Lock()
Expand All @@ -227,7 +235,7 @@ func (w *Warmer) warm(ctx context.Context, id image.Name, creds registry.Credent
}(imID)
}
awaitFetchers.Wait()
w.Logger.Log("updated", id.String(), "count", successCount)
logger.Log("updated", id.String(), "count", successCount)
}

// We managed to fetch new metadata for everything we were missing
Expand Down
6 changes: 4 additions & 2 deletions registry/cache/warming_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ func TestWarm(t *testing.T) {
ref, _ := image.ParseRef("example.com/path/image:tag")
repo := ref.Name

logger := log.NewNopLogger()

client := &mock.Client{
TagsFn: func() ([]string, error) {
return []string{"tag"}, nil
Expand All @@ -65,8 +67,8 @@ func TestWarm(t *testing.T) {
}
factory := &mock.ClientFactory{Client: client}
c := &mem{}
warmer := &Warmer{Logger: log.NewNopLogger(), ClientFactory: factory, Cache: c, Burst: 10}
warmer.warm(context.TODO(), repo, registry.NoCredentials())
warmer := &Warmer{clientFactory: factory, cache: c, burst: 10}
warmer.warm(context.TODO(), logger, repo, registry.NoCredentials())

registry := &Cache{Reader: c}
repoInfo, err := registry.GetRepository(ref.Name)
Expand Down

0 comments on commit 1a06c22

Please sign in to comment.