Skip to content

Commit

Permalink
Lock ops based on computed key rather than LLB digest
Browse files Browse the repository at this point in the history
  • Loading branch information
mikejholly committed May 21, 2024
1 parent d4e630c commit dccc5a2
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 16 deletions.
14 changes: 8 additions & 6 deletions solver/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
"go.opentelemetry.io/otel/trace"
)

// ResolveOpFunc finds an Op implementation for a Vertex
// ResolveOpFunc finds an Op implementation for a Vertex.
type ResolveOpFunc func(Vertex, Builder) (Op, error)

type Builder interface {
Expand Down Expand Up @@ -256,11 +256,12 @@ type Job struct {
}

type SolverOpt struct {
ResolveOpFunc ResolveOpFunc
DefaultCache CacheManager
ResultSource ResultSource
RefIDStore *RefIDStore
CommitRefFunc CommitRefFunc
ResolveOpFunc ResolveOpFunc
DefaultCache CacheManager
ResultSource ResultSource
RefIDStore *RefIDStore
CommitRefFunc CommitRefFunc
IsSourceOpFunc IsSourceOpFunc
}

func NewSolver(opts SolverOpt) *Solver {
Expand All @@ -280,6 +281,7 @@ func NewSolver(opts SolverOpt) *Solver {
solver,
opts.RefIDStore,
opts.ResultSource,
opts.IsSourceOpFunc,
)
solver.simple = simple

Expand Down
29 changes: 29 additions & 0 deletions solver/llbsolver/simple.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package llbsolver

import (
"github.com/moby/buildkit/solver"
"github.com/moby/buildkit/solver/llbsolver/ops"
)

// isSourceOp returns a function that can be called to determine if a Vertex
// contains an *ops.SourceOp.
func (s *Solver) isSourceOp() solver.IsSourceOpFunc {
return func(v solver.Vertex, b solver.Builder) (bool, error) {
w, err := s.resolveWorker()
if err != nil {
return false, err
}

op, err := w.ResolveOp(v, s.Bridge(b), s.sm)
if err != nil {
return false, err
}

switch op.(type) {
case *ops.SourceOp:
return true, nil
default:
return false, nil
}
}
}
11 changes: 6 additions & 5 deletions solver/llbsolver/solver.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,11 +143,12 @@ func New(opt Opt) (*Solver, error) {
)

s.solver = solver.NewSolver(solver.SolverOpt{
ResolveOpFunc: s.resolver(),
DefaultCache: opt.CacheManager,
ResultSource: sources,
CommitRefFunc: worker.FinalizeRef,
RefIDStore: refIDStore,
ResolveOpFunc: s.resolver(),
IsSourceOpFunc: s.isSourceOp(),
DefaultCache: opt.CacheManager,
ResultSource: sources,
CommitRefFunc: worker.FinalizeRef,
RefIDStore: refIDStore,
})
return s, nil
}
Expand Down
57 changes: 52 additions & 5 deletions solver/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"sync"
"time"

"github.com/hashicorp/golang-lru/simplelru"
"github.com/moby/buildkit/util/bklog"
"github.com/moby/buildkit/util/progress"
"github.com/moby/buildkit/util/tracing"
Expand All @@ -21,21 +22,49 @@ import (
// CommitRefFunc can be used to finalize a Result's ImmutableRef.
type CommitRefFunc func(ctx context.Context, result Result) error

// IsSourceOpFunc determines if the vertex represents a source op.
type IsSourceOpFunc func(Vertex, Builder) (bool, error)

// ResultSource can be any source (local or remote) that allows one to load a
// Result using a cache key digest.
type ResultSource interface {
Load(ctx context.Context, cacheKey digest.Digest) (Result, bool, error)
}

type sourceOpCtrl struct {
lru *simplelru.LRU
mu sync.Mutex
}

func newSourceOpCtrl() *sourceOpCtrl {
lru, _ := simplelru.NewLRU(1e3, nil) // Error impossible on positive first argument.
return &sourceOpCtrl{lru: lru}
}

func (s *sourceOpCtrl) processed(d digest.Digest, sessionID string) bool {
s.mu.Lock()
defer s.mu.Unlock()

key := fmt.Sprintf("%s:%s", sessionID, d)
ret := s.lru.Contains(key)

fmt.Println("LRU size", s.lru.Len())

s.lru.Add(key, struct{}{})

return ret
}

type simpleSolver struct {
resolveOpFunc ResolveOpFunc
isSourceOpFunc IsSourceOpFunc
commitRefFunc CommitRefFunc
solver *Solver
parallelGuard *parallelGuard
refIDStore *RefIDStore
resultSource ResultSource
cacheKeyManager *cacheKeyManager
mu sync.Mutex
sourceOpCtrl *sourceOpCtrl
}

func newSimpleSolver(
Expand All @@ -44,6 +73,7 @@ func newSimpleSolver(
solver *Solver,
refIDStore *RefIDStore,
resultSource ResultSource,
isSourceOpFunc IsSourceOpFunc,
) *simpleSolver {
return &simpleSolver{
cacheKeyManager: newCacheKeyManager(),
Expand All @@ -53,6 +83,8 @@ func newSimpleSolver(
solver: solver,
refIDStore: refIDStore,
resultSource: resultSource,
isSourceOpFunc: isSourceOpFunc,
sourceOpCtrl: newSourceOpCtrl(),
}
}

Expand Down Expand Up @@ -121,14 +153,28 @@ func (s *simpleSolver) buildOne(ctx context.Context, d digest.Digest, vertex Ver
defer done()
<-wait

v, ok, err := s.resultSource.Load(ctx, cacheKey)
isSourceOp, err := s.isSourceOpFunc(vertex, job)
if err != nil {
return nil, "", err
}

if ok && v != nil {
notifyError(ctx, st, true, nil)
return v, cacheKey, nil
// It essential to execute source operations in order to detect content
// change, but these operations should only be run once per build. Here, we
// use an LRU cache to whether we need to execute the source operation for
// this job. The jobs may be re-run if the LRU size is exceeded, but this
// shouldn't have a big impact on the build.
canLoadCache := !isSourceOp || isSourceOp && s.sourceOpCtrl.processed(cacheKey, job.SessionID)

if canLoadCache {
v, ok, err := s.resultSource.Load(ctx, cacheKey)
if err != nil {
return nil, "", err
}

if ok && v != nil {
notifyError(ctx, st, true, nil)
return v, cacheKey, nil
}
}

results, _, err := st.op.Exec(ctx, inputs)
Expand Down Expand Up @@ -298,6 +344,7 @@ func (s *simpleSolver) preprocessInputs(ctx context.Context, st *state, vertex V
// operation.
if dep.ComputeDigestFunc != nil {
compDigest, err := dep.ComputeDigestFunc(ctx, res, st)
fmt.Println("slow cache digest", compDigest)
if err != nil {
bklog.G(ctx).Warnf("failed to compute digest: %v", err)
return nil, err
Expand Down

0 comments on commit dccc5a2

Please sign in to comment.