Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor build/controller API to use single client.Build call #1750

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
551 changes: 316 additions & 235 deletions build/build.go

Large diffs are not rendered by default.

157 changes: 49 additions & 108 deletions build/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"encoding/json"
"io"
"sync"
"sync/atomic"

controllerapi "github.com/docker/buildx/controller/pb"
"github.com/moby/buildkit/client"
Expand All @@ -17,110 +16,20 @@ import (
specs "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
)

func NewResultContext(c *client.Client, solveOpt client.SolveOpt, res *gateway.Result) (*ResultContext, error) {
ctx := context.Background()
def, err := getDefinition(ctx, res)
if err != nil {
return nil, err
}
return getResultAt(ctx, c, solveOpt, def, nil)
}

func getDefinition(ctx context.Context, res *gateway.Result) (*pb.Definition, error) {
ref, err := res.SingleRef()
if err != nil {
return nil, err
}
st, err := ref.ToState()
if err != nil {
return nil, err
}
def, err := st.Marshal(ctx)
if err != nil {
return nil, err
}
return def.ToPB(), nil
}

func getResultAt(ctx context.Context, c *client.Client, solveOpt client.SolveOpt, target *pb.Definition, statusChan chan *client.SolveStatus) (*ResultContext, error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

// forward SolveStatus
done := new(atomic.Bool)
defer done.Store(true)
ch := make(chan *client.SolveStatus)
go func() {
for {
s := <-ch
if s == nil {
return
}
if done.Load() {
// Do not forward if the function returned because statusChan is possibly closed
continue
}
select {
case statusChan <- s:
case <-ctx.Done():
}
}
}()

// get result
resultCtxCh := make(chan *ResultContext)
errCh := make(chan error)
go func() {
_, err := c.Build(context.Background(), solveOpt, "buildx", func(ctx context.Context, c gateway.Client) (*gateway.Result, error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
resultCtx := ResultContext{}
res2, err := c.Solve(ctx, gateway.SolveRequest{
Evaluate: true,
Definition: target,
})
if err != nil {
var se *errdefs.SolveError
if errors.As(err, &se) {
resultCtx.solveErr = se
} else {
return nil, err
}
}
// Record the client and ctx as well so that containers can be created from the SolveError.
resultCtx.res = res2
resultCtx.gwClient = c
resultCtx.gwCtx = ctx
resultCtx.gwDone = cancel
select {
case resultCtxCh <- &resultCtx:
case <-ctx.Done():
return nil, ctx.Err()
}
<-ctx.Done()
return nil, nil
}, ch)
if err != nil {
errCh <- err
}
}()

select {
case resultCtx := <-resultCtxCh:
return resultCtx, nil
case err := <-errCh:
return nil, err
case <-ctx.Done():
return nil, ctx.Err()
}
}

// ResultContext is a build result with the client that built it.
type ResultContext struct {
res *gateway.Result
solveErr *errdefs.SolveError
resp *client.SolveResponse
err error
suspend chan<- struct{}
suspendDone <-chan struct{}

hooks []func(ctx context.Context) error

gwRef *gateway.Result
gwErr *errdefs.SolveError

gwClient gateway.Client
gwCtx context.Context
Expand All @@ -131,7 +40,39 @@ type ResultContext struct {
cleanupsMu sync.Mutex
}

func (r *ResultContext) Done() {
func (r *ResultContext) hook(hook func(ctx context.Context) error) {
r.hooks = append(r.hooks, hook)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like called from goroutines so should be protected by lock.

}

func (r *ResultContext) Result(ctx context.Context) (*gateway.Result, *errdefs.SolveError) {
return r.gwRef, r.gwErr
}

func (r *ResultContext) Wait(ctx context.Context) (*client.SolveResponse, error) {
defer r.Close()

close(r.suspend)
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-r.suspendDone:
eg, ctx := errgroup.WithContext(ctx)
eg.Go(func() error {
for _, f := range r.hooks {
if err := f(ctx); err != nil {
return err
}
}
return nil
})
if err := eg.Wait(); err != nil {
return nil, err
}
return r.resp, r.err
}
}

func (r *ResultContext) Close() {
r.gwDoneOnce.Do(func() {
r.cleanupsMu.Lock()
cleanups := r.cleanups
Expand All @@ -156,16 +97,16 @@ func (r *ResultContext) build(buildFunc gateway.BuildFunc) (err error) {
}

func (r *ResultContext) getContainerConfig(ctx context.Context, c gateway.Client, cfg *controllerapi.InvokeConfig) (containerCfg gateway.NewContainerRequest, _ error) {
if r.res != nil && r.solveErr == nil {
if r.gwRef != nil && r.gwErr == nil {
logrus.Debugf("creating container from successful build")
ccfg, err := containerConfigFromResult(ctx, r.res, c, *cfg)
ccfg, err := containerConfigFromResult(ctx, r.gwRef, c, *cfg)
if err != nil {
return containerCfg, err
}
containerCfg = *ccfg
} else {
logrus.Debugf("creating container from failed build %+v", cfg)
ccfg, err := containerConfigFromError(r.solveErr, *cfg)
ccfg, err := containerConfigFromError(r.gwErr, *cfg)
if err != nil {
return containerCfg, errors.Wrapf(err, "no result nor error is available")
}
Expand All @@ -176,14 +117,14 @@ func (r *ResultContext) getContainerConfig(ctx context.Context, c gateway.Client

func (r *ResultContext) getProcessConfig(cfg *controllerapi.InvokeConfig, stdin io.ReadCloser, stdout io.WriteCloser, stderr io.WriteCloser) (_ gateway.StartRequest, err error) {
processCfg := newStartRequest(stdin, stdout, stderr)
if r.res != nil && r.solveErr == nil {
if r.gwRef != nil && r.gwErr == nil {
logrus.Debugf("creating container from successful build")
if err := populateProcessConfigFromResult(&processCfg, r.res, *cfg); err != nil {
if err := populateProcessConfigFromResult(&processCfg, r.gwRef, *cfg); err != nil {
return processCfg, err
}
} else {
logrus.Debugf("creating container from failed build %+v", cfg)
if err := populateProcessConfigFromError(&processCfg, r.solveErr, *cfg); err != nil {
if err := populateProcessConfigFromError(&processCfg, r.gwErr, *cfg); err != nil {
return processCfg, err
}
}
Expand Down
Loading