From c030a708f9b82f85b016e7f48060ebb5b2ed72c3 Mon Sep 17 00:00:00 2001 From: Justin Chadwell Date: Tue, 16 May 2023 17:25:36 +0100 Subject: [PATCH] build: move main solve request into main gateway call Now, we always perform the full solve request in the main gateway call. This ensures that progress works properly, and makes the lifetime semantics much clearer. NewResultContext abstracts the details of a successful/failed build, to always return a single ResultContext, even though the details of how a gateway is created is different: - For a failed build, we can just keep the gateway open. - For a successful build, we immediately open another gateway and re-evaluate the build definition in that gateway. This should give an instant cache hit (since the build was just successful). Signed-off-by: Justin Chadwell --- build/build.go | 22 ++-- build/result.go | 330 ++++++++++++++++++++++++++++++------------------ 2 files changed, 221 insertions(+), 131 deletions(-) diff --git a/build/build.go b/build/build.go index dcb0194f37c1..de634b53882f 100644 --- a/build/build.go +++ b/build/build.go @@ -882,10 +882,11 @@ func BuildWithResultHandler(ctx context.Context, nodes []builder.Node, opt map[s ch, done := progress.NewChannel(pw) defer func() { <-done }() + _ = done cc := c var printRes map[string][]byte - rr, err := c.Build(ctx, so, "buildx", func(ctx context.Context, c gateway.Client) (*gateway.Result, error) { + buildFunc := func(ctx context.Context, c gateway.Client) (*gateway.Result, error) { if opt.PrintFunc != nil { if _, ok := req.FrontendOpt["frontend.caps"]; !ok { req.FrontendOpt["frontend.caps"] = "moby.buildkit.frontend.subrequests+forward" @@ -915,7 +916,6 @@ func BuildWithResultHandler(ctx context.Context, nodes []builder.Node, opt map[s } if fallback { - fmt.Println("falling back!") req.FrontendOpt["build-arg:BUILDKIT_SYNTAX"] = printFallbackImage res2, err2 := c.Solve(ctx, req) if err2 != nil { @@ -931,16 +931,16 @@ func BuildWithResultHandler(ctx context.Context, nodes []builder.Node, opt map[s } results.Set(resultKey(dp.driverIndex, k), res) - if resultHandleFunc != nil { - resultCtx, err := NewResultContext(ctx, cc, so, res) - if err == nil { - resultHandleFunc(dp.driverIndex, resultCtx) - } else { - logrus.Warnf("failed to record result: %s", err) - } - } return res, nil - }, ch) + } + var rr *client.SolveResponse + if resultHandleFunc != nil { + var resultCtx *ResultContext + resultCtx, rr, err = NewResultContext(ctx, cc, so, "buildx", buildFunc, ch) + resultHandleFunc(dp.driverIndex, resultCtx) + } else { + rr, err = c.Build(ctx, so, "buildx", buildFunc, ch) + } if err != nil { return err } diff --git a/build/result.go b/build/result.go index de8a6728acfd..cd7e2caea2ff 100644 --- a/build/result.go +++ b/build/result.go @@ -6,8 +6,6 @@ import ( "encoding/json" "io" "sync" - "sync/atomic" - "time" controllerapi "github.com/docker/buildx/controller/pb" "github.com/moby/buildkit/client" @@ -22,140 +20,229 @@ import ( "golang.org/x/sync/errgroup" ) -func NewResultContext(ctx context.Context, c *client.Client, solveOpt client.SolveOpt, res *gateway.Result) (*ResultContext, error) { - def, err := getDefinition(ctx, res) - if err != nil { - return nil, err - } - return getResultAt(ctx, c, solveOpt, def, nil) -} +// NewResultContext wraps a call to client.Build, additionally returning a +// ResultContext alongside the standard response and error. +// +// This ResultContext can be used to execute build steps in the same context as +// the build occurred, which can allow easy debugging of build failures and +// successes. +// +// If the returned ResultContext is not nil, the caller must call Done() on it. +func NewResultContext(ctx context.Context, cc *client.Client, opt client.SolveOpt, product string, buildFunc gateway.BuildFunc, ch chan *client.SolveStatus) (*ResultContext, *client.SolveResponse, error) { + resCtxCh := make(chan *ResultContext, 1) + respCh := make(chan *client.SolveResponse, 1) + errCh := make(chan error, 1) -func getDefinition(ctx context.Context, res *gateway.Result) (*result.Result[*pb.Definition], error) { - return result.ConvertResult(res, func(ref gateway.Reference) (*pb.Definition, error) { - st, err := ref.ToState() - if err != nil { - return nil, err - } - def, err := st.Marshal(ctx) - if err != nil { - return nil, err - } - return def.ToPB(), nil - }) -} - -func getResultAt(ctx context.Context, c *client.Client, solveOpt client.SolveOpt, targets *result.Result[*pb.Definition], statusChan chan *client.SolveStatus) (*ResultContext, error) { - ctx, cancel := context.WithCancel(ctx) - defer cancel() - - // forward SolveStatus - done := new(atomic.Bool) - defer done.Store(true) - ch := make(chan *client.SolveStatus) go func() { - for { - s := <-ch - if s == nil { - return + // once done is closed, we've recorded a ResultContext, so we shouldn't + // cancel the underlying solve request + baseCtx := ctx + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan struct{}) + go func() { + select { + case <-baseCtx.Done(): + cancel() + case <-done: } - if done.Load() { - // Do not forward if the function returned because statusChan is possibly closed - continue + }() + + // allows closing the original channel, and discarding the rest of the + // status messages in the case that we get a solve error + baseCh := ch + ch := make(chan *client.SolveStatus) + go func() { + for { + s, ok := <-ch + if !ok { + return + } + baseCh <- s } - select { - case statusChan <- s: - case <-ctx.Done(): + }() + + var res *gateway.Result + resp, err := cc.Build(ctx, opt, product, func(ctx context.Context, c gateway.Client) (*gateway.Result, error) { + var err error + res, err = buildFunc(ctx, c) + + if res != nil && err == nil { + // force evaluation of all targets + def, err2 := getDefinition(ctx, res) + if err2 != nil { + return nil, err2 + } + res2, err2 := evalDefinition(ctx, c, def) + if err != nil { + return nil, err + } + res = res2 + err = err2 } - } - }() - // get result - resultCtxCh := make(chan *ResultContext) - errCh := make(chan error) - go func() { - solveOpt := solveOpt - solveOpt.Ref = "" - buildDoneCh := make(chan struct{}) - _, err := c.Build(context.Background(), solveOpt, "buildx", func(ctx context.Context, c gateway.Client) (*gateway.Result, error) { - doneErr := errors.Errorf("done") - ctx, cancel := context.WithCancelCause(ctx) - defer cancel(doneErr) - - // force evaluation of all targets in parallel - results := make(map[*pb.Definition]*gateway.Result) - resultsMu := sync.Mutex{} - eg, egCtx := errgroup.WithContext(ctx) - targets.EachRef(func(def *pb.Definition) error { - eg.Go(func() error { - res2, err := c.Solve(egCtx, gateway.SolveRequest{ - Evaluate: true, - Definition: def, - }) - if err != nil { - return err - } - resultsMu.Lock() - results[def] = res2 - resultsMu.Unlock() - return nil - }) - return nil - }) - resultCtx := ResultContext{} - if err := eg.Wait(); err != nil { + if err != nil { var se *errdefs.SolveError if errors.As(err, &se) { - resultCtx.solveErr = se - } else { - return nil, err + resCtx := &ResultContext{ + done: make(chan struct{}), + solveErr: se, + gwClient: c, + gwCtx: ctx, + } + close(done) + close(baseCh) + resCtxCh <- resCtx + errCh <- err + select { + case <-resCtx.done: + case <-ctx.Done(): + } } } - res2, _ := result.ConvertResult(targets, func(def *pb.Definition) (gateway.Reference, error) { - if res, ok := results[def]; ok { - return res.Ref, nil - } - return nil, nil - }) + return res, err + }, ch) + select { + case _, ok := <-done: + if !ok { + cancel() + return + } + default: + } + close(baseCh) + + if resp != nil { + respCh <- resp + } + if err != nil { + resCtxCh <- nil + errCh <- err + cancel() + return + } - // Record the client and ctx as well so that containers can be created from the SolveError. - resultCtx.res = res2 - resultCtx.gwClient = c - resultCtx.gwCtx = ctx - resultCtx.gwDone = func() { - cancel(doneErr) - // wait for Build() completion(or timeout) to ensure the Build's finalizing and avoiding an error "context canceled" + go func() { + def, err := getDefinition(ctx, res) + if err != nil { + resCtxCh <- nil + errCh <- err + cancel() + return + } + + // NOTE: ideally this second connection should be lazily opened + // NOTE: or... we could open this session *during* the other + // session, which would let us grab the same vertices + // immediately, mitigating the risk for "inconsistent solve + // result" + opt := opt + opt.Ref = "" + _, err = cc.Build(ctx, opt, "buildx", func(ctx context.Context, c gateway.Client) (*gateway.Result, error) { + res, err := evalDefinition(ctx, c, def) + if err != nil { + return nil, errors.Wrap(err, "inconsistent solve result") + } + resCtx := &ResultContext{ + done: make(chan struct{}), + res: res, + gwClient: c, + gwCtx: ctx, + } + close(done) + resCtxCh <- resCtx select { - case <-buildDoneCh: - case <-time.After(5 * time.Second): + case <-resCtx.done: + return nil, nil + case <-ctx.Done(): + return nil, ctx.Err() } - } + }, nil) select { - case resultCtxCh <- &resultCtx: - case <-ctx.Done(): - return nil, ctx.Err() + case _, ok := <-done: + if !ok { + cancel() + return + } + default: } + if err != nil { + resCtxCh <- nil + errCh <- err + } + cancel() + }() + }() - // wait for cleanup or cancel - <-ctx.Done() - if context.Cause(ctx) != doneErr { // doneErr is not an error. - return nil, ctx.Err() + var resCtx *ResultContext + var haveResCtx bool + var resp *client.SolveResponse + var err error + for { + select { + case <-ctx.Done(): + return nil, nil, ctx.Err() + case resCtx = <-resCtxCh: + haveResCtx = true + if resp != nil || err != nil { + return resCtx, resp, err } - return nil, nil - }, ch) - close(buildDoneCh) + case resp = <-respCh: + if haveResCtx { + return resCtx, resp, err + } + case err = <-errCh: + if haveResCtx { + return resCtx, nil, err + } + } + } +} + +func getDefinition(ctx context.Context, res *gateway.Result) (*result.Result[*pb.Definition], error) { + return result.ConvertResult(res, func(ref gateway.Reference) (*pb.Definition, error) { + st, err := ref.ToState() if err != nil { - errCh <- err + return nil, err } - }() + def, err := st.Marshal(ctx) + if err != nil { + return nil, err + } + return def.ToPB(), nil + }) +} - select { - case resultCtx := <-resultCtxCh: - return resultCtx, nil - case err := <-errCh: +func evalDefinition(ctx context.Context, c gateway.Client, defs *result.Result[*pb.Definition]) (*gateway.Result, error) { + // force evaluation of all targets in parallel + results := make(map[*pb.Definition]*gateway.Result) + resultsMu := sync.Mutex{} + eg, egCtx := errgroup.WithContext(ctx) + defs.EachRef(func(def *pb.Definition) error { + eg.Go(func() error { + res2, err := c.Solve(egCtx, gateway.SolveRequest{ + Evaluate: true, + Definition: def, + }) + if err != nil { + return err + } + resultsMu.Lock() + results[def] = res2 + resultsMu.Unlock() + return nil + }) + return nil + }) + if err := eg.Wait(); err != nil { return nil, err - case <-ctx.Done(): - return nil, ctx.Err() } + res2, _ := result.ConvertResult(defs, func(def *pb.Definition) (gateway.Reference, error) { + if res, ok := results[def]; ok { + return res.Ref, nil + } + return nil, nil + }) + return res2, nil } // ResultContext is a build result with the client that built it. @@ -163,17 +250,18 @@ type ResultContext struct { res *gateway.Result solveErr *errdefs.SolveError - gwClient gateway.Client - gwCtx context.Context - gwDone func() - gwDoneOnce sync.Once + done chan struct{} + doneOnce sync.Once + + gwClient gateway.Client + gwCtx context.Context cleanups []func() cleanupsMu sync.Mutex } func (r *ResultContext) Done() { - r.gwDoneOnce.Do(func() { + r.doneOnce.Do(func() { r.cleanupsMu.Lock() cleanups := r.cleanups r.cleanups = nil @@ -181,7 +269,9 @@ func (r *ResultContext) Done() { for _, f := range cleanups { f() } - r.gwDone() + + close(r.done) + <-r.gwCtx.Done() }) }