diff --git a/solver/jobs.go b/solver/jobs.go index 7fa06be10..f6bf52c70 100644 --- a/solver/jobs.go +++ b/solver/jobs.go @@ -259,7 +259,6 @@ type SolverOpt struct { ResolveOpFunc ResolveOpFunc DefaultCache CacheManager ResultSource ResultSource - RefIDStore *RefIDStore CommitRefFunc CommitRefFunc IsRunOnceFunc IsRunOnceFunc } @@ -279,7 +278,6 @@ func NewSolver(opts SolverOpt) *Solver { opts.ResolveOpFunc, opts.CommitRefFunc, solver, - opts.RefIDStore, opts.ResultSource, opts.IsRunOnceFunc, ) diff --git a/solver/llbsolver/solver.go b/solver/llbsolver/solver.go index 4c228e354..285f69c26 100644 --- a/solver/llbsolver/solver.go +++ b/solver/llbsolver/solver.go @@ -132,13 +132,13 @@ func New(opt Opt) (*Solver, error) { } s.sysSampler = sampler - refIDStore, err := solver.NewRefIDStore(opt.RootDir) + workerSource, err := worker.NewWorkerResultSource(opt.WorkerController, opt.RootDir) if err != nil { return nil, err } sources := worker.NewCombinedResultSource( - worker.NewWorkerResultSource(opt.WorkerController, refIDStore), + workerSource, remoteSource, ) @@ -148,7 +148,6 @@ func New(opt Opt) (*Solver, error) { DefaultCache: opt.CacheManager, ResultSource: sources, CommitRefFunc: worker.FinalizeRef, - RefIDStore: refIDStore, }) return s, nil } diff --git a/solver/simple.go b/solver/simple.go index 813ee144d..ce8481c70 100644 --- a/solver/simple.go +++ b/solver/simple.go @@ -6,7 +6,6 @@ import ( "fmt" "hash" "io" - "path/filepath" "sync" "time" @@ -16,7 +15,6 @@ import ( "github.com/moby/buildkit/util/tracing" digest "github.com/opencontainers/go-digest" "github.com/pkg/errors" - bolt "go.etcd.io/bbolt" ) const ( @@ -35,6 +33,7 @@ type IsRunOnceFunc func(Vertex, Builder) (bool, error) // Result using a cache key digest. type ResultSource interface { Load(ctx context.Context, cacheKey digest.Digest) (Result, bool, error) + Link(ctx context.Context, cacheKey digest.Digest, refID string) error } // runOnceCtrl is a simple wrapper around an LRU cache. It's used to ensure that @@ -67,35 +66,30 @@ func (s *runOnceCtrl) hasRun(d digest.Digest, sessionID string) bool { } type simpleSolver struct { - resolveOpFunc ResolveOpFunc - isRunOnceFunc IsRunOnceFunc - commitRefFunc CommitRefFunc - solver *Solver - parallelGuard *parallelGuard - refIDStore *RefIDStore - resultSource ResultSource - cacheKeyManager *cacheKeyManager - runOnceCtrl *runOnceCtrl + resolveOpFunc ResolveOpFunc + isRunOnceFunc IsRunOnceFunc + commitRefFunc CommitRefFunc + solver *Solver + parallelGuard *parallelGuard + resultSource ResultSource + runOnceCtrl *runOnceCtrl } func newSimpleSolver( resolveOpFunc ResolveOpFunc, commitRefFunc CommitRefFunc, solver *Solver, - refIDStore *RefIDStore, resultSource ResultSource, isRunOnceFunc IsRunOnceFunc, ) *simpleSolver { return &simpleSolver{ - cacheKeyManager: newCacheKeyManager(), - parallelGuard: newParallelGuard(parallelGuardWait), - resolveOpFunc: resolveOpFunc, - commitRefFunc: commitRefFunc, - solver: solver, - refIDStore: refIDStore, - resultSource: resultSource, - isRunOnceFunc: isRunOnceFunc, - runOnceCtrl: newRunOnceCtrl(), + parallelGuard: newParallelGuard(parallelGuardWait), + resolveOpFunc: resolveOpFunc, + commitRefFunc: commitRefFunc, + solver: solver, + resultSource: resultSource, + isRunOnceFunc: isRunOnceFunc, + runOnceCtrl: newRunOnceCtrl(), } } @@ -107,13 +101,15 @@ func (s *simpleSolver) build(ctx context.Context, job *Job, e Edge) (CachedResul var ret Result var expKeys []ExportableCacheKey + cacheKeyMan := newCacheKeyManager() + for _, d := range digests { vertex, ok := vertices[d] if !ok { return nil, errors.Errorf("digest %s not found", d) } - res, cacheKey, err := s.buildOne(ctx, d, vertex, job, e) + res, cacheKey, err := s.buildOne(ctx, cacheKeyMan, d, vertex, job, e) if err != nil { return nil, err } @@ -146,7 +142,7 @@ func (s *simpleSolver) build(ctx context.Context, job *Job, e Edge) (CachedResul return NewCachedResult(ret, expKeys), nil } -func (s *simpleSolver) buildOne(ctx context.Context, d digest.Digest, vertex Vertex, job *Job, e Edge) (Result, digest.Digest, error) { +func (s *simpleSolver) buildOne(ctx context.Context, cacheKeyMan *cacheKeyManager, d digest.Digest, vertex Vertex, job *Job, e Edge) (Result, digest.Digest, error) { st := s.state(vertex, job) // Add cache opts to context as they will be accessed by cache retrieval. @@ -158,13 +154,13 @@ func (s *simpleSolver) buildOne(ctx context.Context, d digest.Digest, vertex Ver return nil, "", err } - inputs, err := s.preprocessInputs(ctx, st, vertex, cm.CacheMap, job) + inputs, err := s.preprocessInputs(ctx, cacheKeyMan, st, vertex, cm.CacheMap, job) if err != nil { notifyError(ctx, st, false, err) return nil, "", err } - cacheKey, err := s.cacheKeyManager.cacheKey(ctx, d) + cacheKey, err := cacheKeyMan.cacheKey(ctx, d) if err != nil { return nil, "", err } @@ -214,7 +210,7 @@ func (s *simpleSolver) buildOne(ctx context.Context, d digest.Digest, vertex Ver } } - err = s.refIDStore.Set(ctx, cacheKey, res.ID()) + err = s.resultSource.Link(ctx, cacheKey, res.ID()) if err != nil { return nil, "", err } @@ -306,7 +302,7 @@ func (s *simpleSolver) exploreVertices(e Edge) ([]digest.Digest, map[digest.Dige return ret, vertices } -func (s *simpleSolver) preprocessInputs(ctx context.Context, st *state, vertex Vertex, cm *CacheMap, job *Job) ([]Result, error) { +func (s *simpleSolver) preprocessInputs(ctx context.Context, cacheKeyMan *cacheKeyManager, st *state, vertex Vertex, cm *CacheMap, job *Job) ([]Result, error) { // This struct is used to reconstruct a cache key from an LLB digest & all // parents using consistent digests that depend on the full dependency chain. scm := simpleCacheMap{ @@ -330,7 +326,7 @@ func (s *simpleSolver) preprocessInputs(ctx context.Context, st *state, vertex V d := in.Vertex.Digest() // Compute a cache key given the LLB digest value. - cacheKey, err := s.cacheKeyManager.cacheKey(ctx, d) + cacheKey, err := cacheKeyMan.cacheKey(ctx, d) if err != nil { return nil, err } @@ -390,7 +386,7 @@ func (s *simpleSolver) preprocessInputs(ctx context.Context, st *state, vertex V inputs = append(inputs, res) } - s.cacheKeyManager.add(vertex.Digest(), &scm) + cacheKeyMan.add(vertex.Digest(), &scm) return inputs, nil } @@ -523,76 +519,6 @@ func (f *parallelGuard) acquire(ctx context.Context, d digest.Digest) (<-chan st return ch, closer } -// RefIDStore uses a BoltDB database to store links from computed cache keys to -// worker ref IDs. -type RefIDStore struct { - db *bolt.DB - bucketName string - rootDir string -} - -// NewRefIDStore creates and returns a new store and initializes a BoltDB -// instance in the specified root directory. -func NewRefIDStore(rootDir string) (*RefIDStore, error) { - r := &RefIDStore{ - bucketName: "ids", - rootDir: rootDir, - } - err := r.init() - if err != nil { - return nil, err - } - return r, nil -} - -func (r *RefIDStore) init() error { - db, err := bolt.Open(filepath.Join(r.rootDir, "ids.db"), 0755, nil) - if err != nil { - return err - } - err = db.Update(func(tx *bolt.Tx) error { - _, err := tx.CreateBucketIfNotExists([]byte("ids")) - return err - }) - if err != nil { - return err - } - r.db = db - return nil -} - -// Set a cache key digest to the value of the worker ref ID. -func (r *RefIDStore) Set(ctx context.Context, key digest.Digest, id string) error { - return r.db.Update(func(tx *bolt.Tx) error { - b := tx.Bucket([]byte(r.bucketName)) - return b.Put([]byte(key), []byte(id)) - }) -} - -// Get a worker ref ID given a cache key digest. -func (r *RefIDStore) Get(ctx context.Context, cacheKey digest.Digest) (string, bool, error) { - var id string - err := r.db.View(func(tx *bolt.Tx) error { - b := tx.Bucket([]byte(r.bucketName)) - id = string(b.Get([]byte(cacheKey))) - return nil - }) - if err != nil { - return "", false, err - } - if id == "" { - return "", false, nil - } - return id, true, nil -} - -func (r *RefIDStore) delete(_ context.Context, key string) error { - return r.db.Update(func(tx *bolt.Tx) error { - b := tx.Bucket([]byte(r.bucketName)) - return b.Delete([]byte(key)) - }) -} - func newDigest(s string) digest.Digest { return digest.NewDigestFromEncoded(digest.SHA256, s) } diff --git a/worker/simple.go b/worker/simple.go index a5a4a4344..1973b634c 100644 --- a/worker/simple.go +++ b/worker/simple.go @@ -2,34 +2,46 @@ package worker import ( "context" + "path/filepath" "sync" + "time" "github.com/moby/buildkit/cache" "github.com/moby/buildkit/solver" "github.com/moby/buildkit/util/bklog" digest "github.com/opencontainers/go-digest" + "github.com/pkg/errors" + bolt "go.etcd.io/bbolt" ) -// RefIDSource allows the caller to translate between a cache key and a worker ref ID. -type RefIDSource interface { - Get(ctx context.Context, cacheKey digest.Digest) (string, bool, error) -} +const refIDPrunePeriod = time.Minute // WorkerResultSource abstracts the work involved in loading a Result from a // worker using a ref ID. type WorkerResultSource struct { - wc *Controller - ids RefIDSource + wc *Controller + ids *refIDStore + prunePeriod time.Duration } // NewWorkerResultSource creates and returns a new *WorkerResultSource. -func NewWorkerResultSource(wc *Controller, ids RefIDSource) *WorkerResultSource { - return &WorkerResultSource{wc: wc, ids: ids} +func NewWorkerResultSource(wc *Controller, rootDir string) (*WorkerResultSource, error) { + ids, err := newRefIDStore(rootDir) + if err != nil { + return nil, err + } + w := &WorkerResultSource{ + wc: wc, + ids: ids, + prunePeriod: refIDPrunePeriod, + } + go w.pruneLoop(context.Background()) + return w, nil } // Load a cached result from a worker. func (w *WorkerResultSource) Load(ctx context.Context, cacheKey digest.Digest) (solver.Result, bool, error) { - id, ok, err := w.ids.Get(ctx, cacheKey) + fullID, ok, err := w.ids.get(cacheKey) if err != nil { return nil, false, err } @@ -38,7 +50,7 @@ func (w *WorkerResultSource) Load(ctx context.Context, cacheKey digest.Digest) ( return nil, false, nil } - workerID, refID, err := parseWorkerRef(id) + workerID, refID, err := parseWorkerRef(fullID) if err != nil { return nil, false, err } @@ -51,6 +63,7 @@ func (w *WorkerResultSource) Load(ctx context.Context, cacheKey digest.Digest) ( ref, err := worker.LoadRef(ctx, refID, false) if err != nil { if cache.IsNotFound(err) { + w.ids.del(cacheKey) bklog.G(ctx).Warnf("could not load ref from worker: %v", err) return nil, false, nil } @@ -60,6 +73,73 @@ func (w *WorkerResultSource) Load(ctx context.Context, cacheKey digest.Digest) ( return NewWorkerRefResult(ref, worker), true, nil } +func (w *WorkerResultSource) Link(ctx context.Context, cacheKey digest.Digest, refID string) error { + return w.ids.set(cacheKey, refID) +} + +func (w *WorkerResultSource) pruneLoop(ctx context.Context) { + tick := time.NewTicker(w.prunePeriod) + for range tick.C { + c, err := w.prune(ctx) + if err != nil { + bklog.G(ctx).Warnf("failed to prune ref IDs: %v", err) + } else { + bklog.G(ctx).Warnf("pruned %d stale ref IDs", c) + } + } +} + +func (w *WorkerResultSource) exists(ctx context.Context, fullID string) (bool, error) { + workerID, refID, err := parseWorkerRef(fullID) + if err != nil { + return false, err + } + + worker, err := w.wc.Get(workerID) + if err != nil { + return false, err + } + + ref, err := worker.LoadRef(ctx, refID, false) + if err != nil { + if cache.IsNotFound(err) { + return false, nil + } + return false, err + } + + ref.Release(ctx) + + return true, nil +} + +func (w *WorkerResultSource) prune(ctx context.Context) (int, error) { + var deleteIDs []digest.Digest + + err := w.ids.walk(func(d digest.Digest, id string) error { + exists, err := w.exists(ctx, id) + if err != nil { + return err + } + if !exists { + deleteIDs = append(deleteIDs, d) + } + return nil + }) + if err != nil { + return 0, err + } + + for _, deleteID := range deleteIDs { + err = w.ids.del(deleteID) + if err != nil { + return 0, err + } + } + + return len(deleteIDs), nil +} + var _ solver.ResultSource = &WorkerResultSource{} // FinalizeRef is a convenience function that calls Finalize on a Result's @@ -109,6 +189,10 @@ func (w *WorkerRemoteSource) Load(ctx context.Context, cacheKey digest.Digest) ( return NewWorkerRefResult(ref, w.worker), true, nil } +func (c *WorkerRemoteSource) Link(ctx context.Context, cacheKey digest.Digest, refID string) error { + return nil // noop +} + // AddResult adds a solver.Remote source for the given cache key. func (w *WorkerRemoteSource) AddResult(ctx context.Context, cacheKey digest.Digest, remote *solver.Remote) { w.mu.Lock() @@ -142,4 +226,113 @@ func (c *CombinedResultSource) Load(ctx context.Context, cacheKey digest.Digest) return nil, false, nil } +// Link a cache key to a ref ID. Only used by the worker result source. +func (c *CombinedResultSource) Link(ctx context.Context, cacheKey digest.Digest, refID string) error { + for _, source := range c.sources { + err := source.Link(ctx, cacheKey, refID) + if err != nil { + return nil + } + } + return nil +} + var _ solver.ResultSource = &CombinedResultSource{} + +// refIDStore uses a BoltDB database to store links from computed cache keys to +// worker ref IDs. +type refIDStore struct { + db *bolt.DB + rootDir string + bucket string + prunePeriod time.Duration +} + +// newRefIDStore creates and returns a new store and initializes a BoltDB +// instance in the specified root directory. +func newRefIDStore(rootDir string) (*refIDStore, error) { + r := &refIDStore{ + bucket: "ids", + rootDir: rootDir, + prunePeriod: refIDPrunePeriod, + } + err := r.init() + if err != nil { + return nil, err + } + return r, nil +} + +func (r *refIDStore) init() error { + db, err := bolt.Open(filepath.Join(r.rootDir, "ids.db"), 0755, nil) + if err != nil { + return err + } + err = db.Update(func(tx *bolt.Tx) error { + _, err := tx.CreateBucketIfNotExists([]byte(r.bucket)) + return err + }) + r.db = db + return nil +} + +// Set a cache key digest to the value of the worker ref ID. It also sets the +// access time for the key. +func (r *refIDStore) set(cacheKey digest.Digest, id string) error { + err := r.db.Update(func(tx *bolt.Tx) error { + b := tx.Bucket([]byte(r.bucket)) + return b.Put([]byte(cacheKey), []byte(id)) + }) + if err != nil { + return errors.Wrap(err, "failed to set ref ID") + } + return nil +} + +// Get a worker ref ID given a cache key digest. It also sets the +// access time for the key. +func (r *refIDStore) get(cacheKey digest.Digest) (string, bool, error) { + var id string + err := r.db.View(func(tx *bolt.Tx) error { + b := tx.Bucket([]byte(r.bucket)) + id = string(b.Get([]byte(cacheKey))) + return nil + }) + if err != nil { + return "", false, errors.Wrap(err, "failed to load ref ID") + } + if id == "" { + return "", false, nil + } + return id, true, nil +} + +func (r *refIDStore) del(cacheKey digest.Digest) error { + err := r.db.Update(func(tx *bolt.Tx) error { + b := tx.Bucket([]byte(r.bucket)) + return b.Delete([]byte(cacheKey)) + }) + if err != nil { + return errors.Wrap(err, "failed to delete key") + } + return nil +} + +func (r *refIDStore) walk(fn func(digest.Digest, string) error) error { + err := r.db.View(func(tx *bolt.Tx) error { + b := tx.Bucket([]byte(r.bucket)) + err := b.ForEach(func(k, v []byte) error { + d := digest.FromString(string(k)) + return fn(d, string(v)) + }) + if err != nil { + return err + } + return nil + }) + if err != nil { + return errors.Wrap(err, "failed to iterate ref IDs") + } + + return err +}