diff --git a/solver/jobs.go b/solver/jobs.go index b23a853a6..7fa06be10 100644 --- a/solver/jobs.go +++ b/solver/jobs.go @@ -20,7 +20,7 @@ import ( "go.opentelemetry.io/otel/trace" ) -// ResolveOpFunc finds an Op implementation for a Vertex +// ResolveOpFunc finds an Op implementation for a Vertex. type ResolveOpFunc func(Vertex, Builder) (Op, error) type Builder interface { @@ -261,6 +261,7 @@ type SolverOpt struct { ResultSource ResultSource RefIDStore *RefIDStore CommitRefFunc CommitRefFunc + IsRunOnceFunc IsRunOnceFunc } func NewSolver(opts SolverOpt) *Solver { @@ -280,6 +281,7 @@ func NewSolver(opts SolverOpt) *Solver { solver, opts.RefIDStore, opts.ResultSource, + opts.IsRunOnceFunc, ) solver.simple = simple diff --git a/solver/llbsolver/simple.go b/solver/llbsolver/simple.go new file mode 100644 index 000000000..3646c5677 --- /dev/null +++ b/solver/llbsolver/simple.go @@ -0,0 +1,29 @@ +package llbsolver + +import ( + "github.com/moby/buildkit/solver" + "github.com/moby/buildkit/solver/llbsolver/ops" +) + +// isRunOnce returns a function that can be called to determine if a Vertex +// contains an operation that must be run at least once per build. +func (s *Solver) isRunOnceOp() solver.IsRunOnceFunc { + return func(v solver.Vertex, b solver.Builder) (bool, error) { + w, err := s.resolveWorker() + if err != nil { + return false, err + } + + op, err := w.ResolveOp(v, s.Bridge(b), s.sm) + if err != nil { + return false, err + } + + switch op.(type) { + case *ops.SourceOp: + return true, nil + default: + return false, nil + } + } +} diff --git a/solver/llbsolver/solver.go b/solver/llbsolver/solver.go index 12dae8052..4c228e354 100644 --- a/solver/llbsolver/solver.go +++ b/solver/llbsolver/solver.go @@ -144,6 +144,7 @@ func New(opt Opt) (*Solver, error) { s.solver = solver.NewSolver(solver.SolverOpt{ ResolveOpFunc: s.resolver(), + IsRunOnceFunc: s.isRunOnceOp(), DefaultCache: opt.CacheManager, ResultSource: sources, CommitRefFunc: worker.FinalizeRef, diff --git a/solver/simple.go b/solver/simple.go index 5fa347c5b..577ee46ed 100644 --- a/solver/simple.go +++ b/solver/simple.go @@ -10,6 +10,7 @@ import ( "sync" "time" + "github.com/hashicorp/golang-lru/simplelru" "github.com/moby/buildkit/util/bklog" "github.com/moby/buildkit/util/progress" "github.com/moby/buildkit/util/tracing" @@ -21,21 +22,52 @@ import ( // CommitRefFunc can be used to finalize a Result's ImmutableRef. type CommitRefFunc func(ctx context.Context, result Result) error +// IsRunOnceFunc determines if the vertex represents an operation that needs to +// be run at least once. +type IsRunOnceFunc func(Vertex, Builder) (bool, error) + // ResultSource can be any source (local or remote) that allows one to load a // Result using a cache key digest. type ResultSource interface { Load(ctx context.Context, cacheKey digest.Digest) (Result, bool, error) } +type runOnceCtrl struct { + lru *simplelru.LRU + mu sync.Mutex +} + +func newRunOnceCtrl() *runOnceCtrl { + lru, _ := simplelru.NewLRU(1e3, nil) // Error impossible on positive first argument. + return &runOnceCtrl{lru: lru} +} + +// hasRun: Here, we use an LRU cache to whether we need to execute the source +// operation for this job. The jobs may be re-run if the LRU size is exceeded, +// but this shouldn't have a big impact on the build. The trade-off is +// worthwhile given the memory-friendliness of LRUs. +func (s *runOnceCtrl) hasRun(d digest.Digest, sessionID string) bool { + s.mu.Lock() + defer s.mu.Unlock() + + key := fmt.Sprintf("%s:%s", sessionID, d) + ret := s.lru.Contains(key) + + s.lru.Add(key, struct{}{}) + + return ret +} + type simpleSolver struct { resolveOpFunc ResolveOpFunc + isRunOnceFunc IsRunOnceFunc commitRefFunc CommitRefFunc solver *Solver parallelGuard *parallelGuard refIDStore *RefIDStore resultSource ResultSource cacheKeyManager *cacheKeyManager - mu sync.Mutex + runOnceCtrl *runOnceCtrl } func newSimpleSolver( @@ -44,6 +76,7 @@ func newSimpleSolver( solver *Solver, refIDStore *RefIDStore, resultSource ResultSource, + isRunOnceFunc IsRunOnceFunc, ) *simpleSolver { return &simpleSolver{ cacheKeyManager: newCacheKeyManager(), @@ -53,6 +86,8 @@ func newSimpleSolver( solver: solver, refIDStore: refIDStore, resultSource: resultSource, + isRunOnceFunc: isRunOnceFunc, + runOnceCtrl: newRunOnceCtrl(), } } @@ -75,6 +110,14 @@ func (s *simpleSolver) build(ctx context.Context, job *Job, e Edge) (CachedResul return nil, err } + // Release previous result as this is not the final return value. + if ret != nil { + err := ret.Release(ctx) + if err != nil { + return nil, err + } + } + ret = res // Hijack the CacheKey type in order to export a reference from the new cache key to the ref ID. @@ -87,6 +130,11 @@ func (s *simpleSolver) build(ctx context.Context, job *Job, e Edge) (CachedResul }) } + err := s.commitRefFunc(ctx, ret) + if err != nil { + return nil, err + } + return NewCachedResult(ret, expKeys), nil } @@ -121,14 +169,25 @@ func (s *simpleSolver) buildOne(ctx context.Context, d digest.Digest, vertex Ver defer done() <-wait - v, ok, err := s.resultSource.Load(ctx, cacheKey) + isRunOnce, err := s.isRunOnceFunc(vertex, job) if err != nil { return nil, "", err } - if ok && v != nil { - notifyError(ctx, st, true, nil) - return v, cacheKey, nil + // Special case for source operations. They need to be run once per build or + // content changes will not be reliably detected. + mayLoadCache := !isRunOnce || isRunOnce && s.runOnceCtrl.hasRun(cacheKey, job.SessionID) + + if mayLoadCache { + v, ok, err := s.resultSource.Load(ctx, cacheKey) + if err != nil { + return nil, "", err + } + + if ok && v != nil { + notifyError(ctx, st, true, nil) + return v, cacheKey, nil + } } results, _, err := st.op.Exec(ctx, inputs) @@ -136,17 +195,17 @@ func (s *simpleSolver) buildOne(ctx context.Context, d digest.Digest, vertex Ver return nil, "", err } - // Ensure all results are finalized (committed to cache). It may be better - // to background these calls at some point. - for _, res := range results { - err = s.commitRefFunc(ctx, res) - if err != nil { - return nil, "", err + res := results[int(e.Index)] + + for i := range results { + if i != int(e.Index) { + err = results[i].Release(ctx) + if err != nil { + return nil, "", err + } } } - res := results[int(e.Index)] - err = s.refIDStore.Set(ctx, cacheKey, res.ID()) if err != nil { return nil, "", err @@ -306,6 +365,14 @@ func (s *simpleSolver) preprocessInputs(ctx context.Context, st *state, vertex V } } + // The result can be released now that the preprocess & slow cache + // digest functions have been run. This is crucial as failing to do so + // will lead to full file copying from mutable snapshots. + err = res.Release(ctx) + if err != nil { + return nil, err + } + // Add input references to the struct as to link dependencies. scm.inputs[i] = in.Vertex.Digest()