diff --git a/build/build.go b/build/build.go index dcb0194f37c1..de634b53882f 100644 --- a/build/build.go +++ b/build/build.go @@ -882,10 +882,11 @@ func BuildWithResultHandler(ctx context.Context, nodes []builder.Node, opt map[s ch, done := progress.NewChannel(pw) defer func() { <-done }() + _ = done cc := c var printRes map[string][]byte - rr, err := c.Build(ctx, so, "buildx", func(ctx context.Context, c gateway.Client) (*gateway.Result, error) { + buildFunc := func(ctx context.Context, c gateway.Client) (*gateway.Result, error) { if opt.PrintFunc != nil { if _, ok := req.FrontendOpt["frontend.caps"]; !ok { req.FrontendOpt["frontend.caps"] = "moby.buildkit.frontend.subrequests+forward" @@ -915,7 +916,6 @@ func BuildWithResultHandler(ctx context.Context, nodes []builder.Node, opt map[s } if fallback { - fmt.Println("falling back!") req.FrontendOpt["build-arg:BUILDKIT_SYNTAX"] = printFallbackImage res2, err2 := c.Solve(ctx, req) if err2 != nil { @@ -931,16 +931,16 @@ func BuildWithResultHandler(ctx context.Context, nodes []builder.Node, opt map[s } results.Set(resultKey(dp.driverIndex, k), res) - if resultHandleFunc != nil { - resultCtx, err := NewResultContext(ctx, cc, so, res) - if err == nil { - resultHandleFunc(dp.driverIndex, resultCtx) - } else { - logrus.Warnf("failed to record result: %s", err) - } - } return res, nil - }, ch) + } + var rr *client.SolveResponse + if resultHandleFunc != nil { + var resultCtx *ResultContext + resultCtx, rr, err = NewResultContext(ctx, cc, so, "buildx", buildFunc, ch) + resultHandleFunc(dp.driverIndex, resultCtx) + } else { + rr, err = c.Build(ctx, so, "buildx", buildFunc, ch) + } if err != nil { return err } diff --git a/build/result.go b/build/result.go index de8a6728acfd..cd7e2caea2ff 100644 --- a/build/result.go +++ b/build/result.go @@ -6,8 +6,6 @@ import ( "encoding/json" "io" "sync" - "sync/atomic" - "time" controllerapi "github.com/docker/buildx/controller/pb" "github.com/moby/buildkit/client" @@ -22,140 +20,229 @@ import ( "golang.org/x/sync/errgroup" ) -func NewResultContext(ctx context.Context, c *client.Client, solveOpt client.SolveOpt, res *gateway.Result) (*ResultContext, error) { - def, err := getDefinition(ctx, res) - if err != nil { - return nil, err - } - return getResultAt(ctx, c, solveOpt, def, nil) -} +// NewResultContext wraps a call to client.Build, additionally returning a +// ResultContext alongside the standard response and error. +// +// This ResultContext can be used to execute build steps in the same context as +// the build occurred, which can allow easy debugging of build failures and +// successes. +// +// If the returned ResultContext is not nil, the caller must call Done() on it. +func NewResultContext(ctx context.Context, cc *client.Client, opt client.SolveOpt, product string, buildFunc gateway.BuildFunc, ch chan *client.SolveStatus) (*ResultContext, *client.SolveResponse, error) { + resCtxCh := make(chan *ResultContext, 1) + respCh := make(chan *client.SolveResponse, 1) + errCh := make(chan error, 1) -func getDefinition(ctx context.Context, res *gateway.Result) (*result.Result[*pb.Definition], error) { - return result.ConvertResult(res, func(ref gateway.Reference) (*pb.Definition, error) { - st, err := ref.ToState() - if err != nil { - return nil, err - } - def, err := st.Marshal(ctx) - if err != nil { - return nil, err - } - return def.ToPB(), nil - }) -} - -func getResultAt(ctx context.Context, c *client.Client, solveOpt client.SolveOpt, targets *result.Result[*pb.Definition], statusChan chan *client.SolveStatus) (*ResultContext, error) { - ctx, cancel := context.WithCancel(ctx) - defer cancel() - - // forward SolveStatus - done := new(atomic.Bool) - defer done.Store(true) - ch := make(chan *client.SolveStatus) go func() { - for { - s := <-ch - if s == nil { - return + // once done is closed, we've recorded a ResultContext, so we shouldn't + // cancel the underlying solve request + baseCtx := ctx + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan struct{}) + go func() { + select { + case <-baseCtx.Done(): + cancel() + case <-done: } - if done.Load() { - // Do not forward if the function returned because statusChan is possibly closed - continue + }() + + // allows closing the original channel, and discarding the rest of the + // status messages in the case that we get a solve error + baseCh := ch + ch := make(chan *client.SolveStatus) + go func() { + for { + s, ok := <-ch + if !ok { + return + } + baseCh <- s } - select { - case statusChan <- s: - case <-ctx.Done(): + }() + + var res *gateway.Result + resp, err := cc.Build(ctx, opt, product, func(ctx context.Context, c gateway.Client) (*gateway.Result, error) { + var err error + res, err = buildFunc(ctx, c) + + if res != nil && err == nil { + // force evaluation of all targets + def, err2 := getDefinition(ctx, res) + if err2 != nil { + return nil, err2 + } + res2, err2 := evalDefinition(ctx, c, def) + if err != nil { + return nil, err + } + res = res2 + err = err2 } - } - }() - // get result - resultCtxCh := make(chan *ResultContext) - errCh := make(chan error) - go func() { - solveOpt := solveOpt - solveOpt.Ref = "" - buildDoneCh := make(chan struct{}) - _, err := c.Build(context.Background(), solveOpt, "buildx", func(ctx context.Context, c gateway.Client) (*gateway.Result, error) { - doneErr := errors.Errorf("done") - ctx, cancel := context.WithCancelCause(ctx) - defer cancel(doneErr) - - // force evaluation of all targets in parallel - results := make(map[*pb.Definition]*gateway.Result) - resultsMu := sync.Mutex{} - eg, egCtx := errgroup.WithContext(ctx) - targets.EachRef(func(def *pb.Definition) error { - eg.Go(func() error { - res2, err := c.Solve(egCtx, gateway.SolveRequest{ - Evaluate: true, - Definition: def, - }) - if err != nil { - return err - } - resultsMu.Lock() - results[def] = res2 - resultsMu.Unlock() - return nil - }) - return nil - }) - resultCtx := ResultContext{} - if err := eg.Wait(); err != nil { + if err != nil { var se *errdefs.SolveError if errors.As(err, &se) { - resultCtx.solveErr = se - } else { - return nil, err + resCtx := &ResultContext{ + done: make(chan struct{}), + solveErr: se, + gwClient: c, + gwCtx: ctx, + } + close(done) + close(baseCh) + resCtxCh <- resCtx + errCh <- err + select { + case <-resCtx.done: + case <-ctx.Done(): + } } } - res2, _ := result.ConvertResult(targets, func(def *pb.Definition) (gateway.Reference, error) { - if res, ok := results[def]; ok { - return res.Ref, nil - } - return nil, nil - }) + return res, err + }, ch) + select { + case _, ok := <-done: + if !ok { + cancel() + return + } + default: + } + close(baseCh) + + if resp != nil { + respCh <- resp + } + if err != nil { + resCtxCh <- nil + errCh <- err + cancel() + return + } - // Record the client and ctx as well so that containers can be created from the SolveError. - resultCtx.res = res2 - resultCtx.gwClient = c - resultCtx.gwCtx = ctx - resultCtx.gwDone = func() { - cancel(doneErr) - // wait for Build() completion(or timeout) to ensure the Build's finalizing and avoiding an error "context canceled" + go func() { + def, err := getDefinition(ctx, res) + if err != nil { + resCtxCh <- nil + errCh <- err + cancel() + return + } + + // NOTE: ideally this second connection should be lazily opened + // NOTE: or... we could open this session *during* the other + // session, which would let us grab the same vertices + // immediately, mitigating the risk for "inconsistent solve + // result" + opt := opt + opt.Ref = "" + _, err = cc.Build(ctx, opt, "buildx", func(ctx context.Context, c gateway.Client) (*gateway.Result, error) { + res, err := evalDefinition(ctx, c, def) + if err != nil { + return nil, errors.Wrap(err, "inconsistent solve result") + } + resCtx := &ResultContext{ + done: make(chan struct{}), + res: res, + gwClient: c, + gwCtx: ctx, + } + close(done) + resCtxCh <- resCtx select { - case <-buildDoneCh: - case <-time.After(5 * time.Second): + case <-resCtx.done: + return nil, nil + case <-ctx.Done(): + return nil, ctx.Err() } - } + }, nil) select { - case resultCtxCh <- &resultCtx: - case <-ctx.Done(): - return nil, ctx.Err() + case _, ok := <-done: + if !ok { + cancel() + return + } + default: } + if err != nil { + resCtxCh <- nil + errCh <- err + } + cancel() + }() + }() - // wait for cleanup or cancel - <-ctx.Done() - if context.Cause(ctx) != doneErr { // doneErr is not an error. - return nil, ctx.Err() + var resCtx *ResultContext + var haveResCtx bool + var resp *client.SolveResponse + var err error + for { + select { + case <-ctx.Done(): + return nil, nil, ctx.Err() + case resCtx = <-resCtxCh: + haveResCtx = true + if resp != nil || err != nil { + return resCtx, resp, err } - return nil, nil - }, ch) - close(buildDoneCh) + case resp = <-respCh: + if haveResCtx { + return resCtx, resp, err + } + case err = <-errCh: + if haveResCtx { + return resCtx, nil, err + } + } + } +} + +func getDefinition(ctx context.Context, res *gateway.Result) (*result.Result[*pb.Definition], error) { + return result.ConvertResult(res, func(ref gateway.Reference) (*pb.Definition, error) { + st, err := ref.ToState() if err != nil { - errCh <- err + return nil, err } - }() + def, err := st.Marshal(ctx) + if err != nil { + return nil, err + } + return def.ToPB(), nil + }) +} - select { - case resultCtx := <-resultCtxCh: - return resultCtx, nil - case err := <-errCh: +func evalDefinition(ctx context.Context, c gateway.Client, defs *result.Result[*pb.Definition]) (*gateway.Result, error) { + // force evaluation of all targets in parallel + results := make(map[*pb.Definition]*gateway.Result) + resultsMu := sync.Mutex{} + eg, egCtx := errgroup.WithContext(ctx) + defs.EachRef(func(def *pb.Definition) error { + eg.Go(func() error { + res2, err := c.Solve(egCtx, gateway.SolveRequest{ + Evaluate: true, + Definition: def, + }) + if err != nil { + return err + } + resultsMu.Lock() + results[def] = res2 + resultsMu.Unlock() + return nil + }) + return nil + }) + if err := eg.Wait(); err != nil { return nil, err - case <-ctx.Done(): - return nil, ctx.Err() } + res2, _ := result.ConvertResult(defs, func(def *pb.Definition) (gateway.Reference, error) { + if res, ok := results[def]; ok { + return res.Ref, nil + } + return nil, nil + }) + return res2, nil } // ResultContext is a build result with the client that built it. @@ -163,17 +250,18 @@ type ResultContext struct { res *gateway.Result solveErr *errdefs.SolveError - gwClient gateway.Client - gwCtx context.Context - gwDone func() - gwDoneOnce sync.Once + done chan struct{} + doneOnce sync.Once + + gwClient gateway.Client + gwCtx context.Context cleanups []func() cleanupsMu sync.Mutex } func (r *ResultContext) Done() { - r.gwDoneOnce.Do(func() { + r.doneOnce.Do(func() { r.cleanupsMu.Lock() cleanups := r.cleanups r.cleanups = nil @@ -181,7 +269,9 @@ func (r *ResultContext) Done() { for _, f := range cleanups { f() } - r.gwDone() + + close(r.done) + <-r.gwCtx.Done() }) }