Skip to content

Commit

Permalink
Prune ref ID database & initialize cache key manager per build
Browse files Browse the repository at this point in the history
  • Loading branch information
mikejholly committed May 24, 2024
1 parent d144e62 commit e7edfd8
Show file tree
Hide file tree
Showing 4 changed files with 230 additions and 114 deletions.
2 changes: 0 additions & 2 deletions solver/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,6 @@ type SolverOpt struct {
ResolveOpFunc ResolveOpFunc
DefaultCache CacheManager
ResultSource ResultSource
RefIDStore *RefIDStore
CommitRefFunc CommitRefFunc
IsRunOnceFunc IsRunOnceFunc
}
Expand All @@ -279,7 +278,6 @@ func NewSolver(opts SolverOpt) *Solver {
opts.ResolveOpFunc,
opts.CommitRefFunc,
solver,
opts.RefIDStore,
opts.ResultSource,
opts.IsRunOnceFunc,
)
Expand Down
5 changes: 2 additions & 3 deletions solver/llbsolver/solver.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,13 +132,13 @@ func New(opt Opt) (*Solver, error) {
}
s.sysSampler = sampler

refIDStore, err := solver.NewRefIDStore(opt.RootDir)
workerSource, err := worker.NewWorkerResultSource(opt.WorkerController, opt.RootDir)
if err != nil {
return nil, err
}

sources := worker.NewCombinedResultSource(
worker.NewWorkerResultSource(opt.WorkerController, refIDStore),
workerSource,
remoteSource,
)

Expand All @@ -148,7 +148,6 @@ func New(opt Opt) (*Solver, error) {
DefaultCache: opt.CacheManager,
ResultSource: sources,
CommitRefFunc: worker.FinalizeRef,
RefIDStore: refIDStore,
})
return s, nil
}
Expand Down
124 changes: 25 additions & 99 deletions solver/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"fmt"
"hash"
"io"
"path/filepath"
"sync"
"time"

Expand All @@ -16,7 +15,6 @@ import (
"github.com/moby/buildkit/util/tracing"
digest "github.com/opencontainers/go-digest"
"github.com/pkg/errors"
bolt "go.etcd.io/bbolt"
)

const (
Expand All @@ -35,6 +33,7 @@ type IsRunOnceFunc func(Vertex, Builder) (bool, error)
// Result using a cache key digest.
type ResultSource interface {
Load(ctx context.Context, cacheKey digest.Digest) (Result, bool, error)
Link(ctx context.Context, cacheKey digest.Digest, refID string) error
}

// runOnceCtrl is a simple wrapper around an LRU cache. It's used to ensure that
Expand Down Expand Up @@ -67,35 +66,30 @@ func (s *runOnceCtrl) hasRun(d digest.Digest, sessionID string) bool {
}

type simpleSolver struct {
resolveOpFunc ResolveOpFunc
isRunOnceFunc IsRunOnceFunc
commitRefFunc CommitRefFunc
solver *Solver
parallelGuard *parallelGuard
refIDStore *RefIDStore
resultSource ResultSource
cacheKeyManager *cacheKeyManager
runOnceCtrl *runOnceCtrl
resolveOpFunc ResolveOpFunc
isRunOnceFunc IsRunOnceFunc
commitRefFunc CommitRefFunc
solver *Solver
parallelGuard *parallelGuard
resultSource ResultSource
runOnceCtrl *runOnceCtrl
}

func newSimpleSolver(
resolveOpFunc ResolveOpFunc,
commitRefFunc CommitRefFunc,
solver *Solver,
refIDStore *RefIDStore,
resultSource ResultSource,
isRunOnceFunc IsRunOnceFunc,
) *simpleSolver {
return &simpleSolver{
cacheKeyManager: newCacheKeyManager(),
parallelGuard: newParallelGuard(parallelGuardWait),
resolveOpFunc: resolveOpFunc,
commitRefFunc: commitRefFunc,
solver: solver,
refIDStore: refIDStore,
resultSource: resultSource,
isRunOnceFunc: isRunOnceFunc,
runOnceCtrl: newRunOnceCtrl(),
parallelGuard: newParallelGuard(parallelGuardWait),
resolveOpFunc: resolveOpFunc,
commitRefFunc: commitRefFunc,
solver: solver,
resultSource: resultSource,
isRunOnceFunc: isRunOnceFunc,
runOnceCtrl: newRunOnceCtrl(),
}
}

Expand All @@ -107,13 +101,15 @@ func (s *simpleSolver) build(ctx context.Context, job *Job, e Edge) (CachedResul
var ret Result
var expKeys []ExportableCacheKey

cacheKeyMan := newCacheKeyManager()

for _, d := range digests {
vertex, ok := vertices[d]
if !ok {
return nil, errors.Errorf("digest %s not found", d)
}

res, cacheKey, err := s.buildOne(ctx, d, vertex, job, e)
res, cacheKey, err := s.buildOne(ctx, cacheKeyMan, d, vertex, job, e)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -146,7 +142,7 @@ func (s *simpleSolver) build(ctx context.Context, job *Job, e Edge) (CachedResul
return NewCachedResult(ret, expKeys), nil
}

func (s *simpleSolver) buildOne(ctx context.Context, d digest.Digest, vertex Vertex, job *Job, e Edge) (Result, digest.Digest, error) {
func (s *simpleSolver) buildOne(ctx context.Context, cacheKeyMan *cacheKeyManager, d digest.Digest, vertex Vertex, job *Job, e Edge) (Result, digest.Digest, error) {
st := s.state(vertex, job)

// Add cache opts to context as they will be accessed by cache retrieval.
Expand All @@ -158,13 +154,13 @@ func (s *simpleSolver) buildOne(ctx context.Context, d digest.Digest, vertex Ver
return nil, "", err
}

inputs, err := s.preprocessInputs(ctx, st, vertex, cm.CacheMap, job)
inputs, err := s.preprocessInputs(ctx, cacheKeyMan, st, vertex, cm.CacheMap, job)
if err != nil {
notifyError(ctx, st, false, err)
return nil, "", err
}

cacheKey, err := s.cacheKeyManager.cacheKey(ctx, d)
cacheKey, err := cacheKeyMan.cacheKey(ctx, d)
if err != nil {
return nil, "", err
}
Expand Down Expand Up @@ -214,7 +210,7 @@ func (s *simpleSolver) buildOne(ctx context.Context, d digest.Digest, vertex Ver
}
}

err = s.refIDStore.Set(ctx, cacheKey, res.ID())
err = s.resultSource.Link(ctx, cacheKey, res.ID())
if err != nil {
return nil, "", err
}
Expand Down Expand Up @@ -306,7 +302,7 @@ func (s *simpleSolver) exploreVertices(e Edge) ([]digest.Digest, map[digest.Dige
return ret, vertices
}

func (s *simpleSolver) preprocessInputs(ctx context.Context, st *state, vertex Vertex, cm *CacheMap, job *Job) ([]Result, error) {
func (s *simpleSolver) preprocessInputs(ctx context.Context, cacheKeyMan *cacheKeyManager, st *state, vertex Vertex, cm *CacheMap, job *Job) ([]Result, error) {
// This struct is used to reconstruct a cache key from an LLB digest & all
// parents using consistent digests that depend on the full dependency chain.
scm := simpleCacheMap{
Expand All @@ -330,7 +326,7 @@ func (s *simpleSolver) preprocessInputs(ctx context.Context, st *state, vertex V
d := in.Vertex.Digest()

// Compute a cache key given the LLB digest value.
cacheKey, err := s.cacheKeyManager.cacheKey(ctx, d)
cacheKey, err := cacheKeyMan.cacheKey(ctx, d)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -390,7 +386,7 @@ func (s *simpleSolver) preprocessInputs(ctx context.Context, st *state, vertex V
inputs = append(inputs, res)
}

s.cacheKeyManager.add(vertex.Digest(), &scm)
cacheKeyMan.add(vertex.Digest(), &scm)

return inputs, nil
}
Expand Down Expand Up @@ -523,76 +519,6 @@ func (f *parallelGuard) acquire(ctx context.Context, d digest.Digest) (<-chan st
return ch, closer
}

// RefIDStore uses a BoltDB database to store links from computed cache keys to
// worker ref IDs.
type RefIDStore struct {
db *bolt.DB
bucketName string
rootDir string
}

// NewRefIDStore creates and returns a new store and initializes a BoltDB
// instance in the specified root directory.
func NewRefIDStore(rootDir string) (*RefIDStore, error) {
r := &RefIDStore{
bucketName: "ids",
rootDir: rootDir,
}
err := r.init()
if err != nil {
return nil, err
}
return r, nil
}

func (r *RefIDStore) init() error {
db, err := bolt.Open(filepath.Join(r.rootDir, "ids.db"), 0755, nil)
if err != nil {
return err
}
err = db.Update(func(tx *bolt.Tx) error {
_, err := tx.CreateBucketIfNotExists([]byte("ids"))
return err
})
if err != nil {
return err
}
r.db = db
return nil
}

// Set a cache key digest to the value of the worker ref ID.
func (r *RefIDStore) Set(ctx context.Context, key digest.Digest, id string) error {
return r.db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte(r.bucketName))
return b.Put([]byte(key), []byte(id))
})
}

// Get a worker ref ID given a cache key digest.
func (r *RefIDStore) Get(ctx context.Context, cacheKey digest.Digest) (string, bool, error) {
var id string
err := r.db.View(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte(r.bucketName))
id = string(b.Get([]byte(cacheKey)))
return nil
})
if err != nil {
return "", false, err
}
if id == "" {
return "", false, nil
}
return id, true, nil
}

func (r *RefIDStore) delete(_ context.Context, key string) error {
return r.db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte(r.bucketName))
return b.Delete([]byte(key))
})
}

func newDigest(s string) digest.Digest {
return digest.NewDigestFromEncoded(digest.SHA256, s)
}
Loading

0 comments on commit e7edfd8

Please sign in to comment.