diff --git a/client/build.go b/client/build.go new file mode 100644 index 000000000000..93059d1394a7 --- /dev/null +++ b/client/build.go @@ -0,0 +1,95 @@ +package client + +import ( + "context" + + "github.com/moby/buildkit/client/buildid" + gateway "github.com/moby/buildkit/frontend/gateway/client" + "github.com/moby/buildkit/frontend/gateway/grpcclient" + gatewayapi "github.com/moby/buildkit/frontend/gateway/pb" + "github.com/moby/buildkit/session" + "github.com/moby/buildkit/util/apicaps" + "github.com/pkg/errors" + "google.golang.org/grpc" +) + +func (c *Client) Build(ctx context.Context, opt SolveOpt, product string, buildFunc gateway.BuildFunc, statusChan chan *SolveStatus) (*SolveResponse, error) { + defer func() { + if statusChan != nil { + close(statusChan) + } + }() + + if opt.Frontend != "" { + return nil, errors.New("invalid SolveOpt, Build interface cannot use Frontend") + } + + if product == "" { + product = apicaps.ExportedProduct + } + + feOpts := opt.FrontendAttrs + opt.FrontendAttrs = nil + + workers, err := c.ListWorkers(ctx) + if err != nil { + return nil, errors.Wrap(err, "listing workers for Build") + } + var gworkers []gateway.WorkerInfo + for _, w := range workers { + gworkers = append(gworkers, gateway.WorkerInfo{ + ID: w.ID, + Labels: w.Labels, + Platforms: w.Platforms, + }) + } + + cb := func(ref string, s *session.Session) error { + g, err := grpcclient.New(ctx, feOpts, s.ID(), product, c.gatewayClientForBuild(ref), gworkers) + if err != nil { + return err + } + + if err := g.Run(ctx, buildFunc); err != nil { + return errors.Wrap(err, "failed to run Build function") + } + return nil + } + + return c.solve(ctx, nil, cb, opt, statusChan) +} + +func (c *Client) gatewayClientForBuild(buildid string) gatewayapi.LLBBridgeClient { + g := gatewayapi.NewLLBBridgeClient(c.conn) + return &gatewayClientForBuild{g, buildid} +} + +type gatewayClientForBuild struct { + gateway gatewayapi.LLBBridgeClient + buildID string +} + +func (g *gatewayClientForBuild) ResolveImageConfig(ctx context.Context, in *gatewayapi.ResolveImageConfigRequest, opts ...grpc.CallOption) (*gatewayapi.ResolveImageConfigResponse, error) { + ctx = buildid.AppendToOutgoingContext(ctx, g.buildID) + return g.gateway.ResolveImageConfig(ctx, in, opts...) +} + +func (g *gatewayClientForBuild) Solve(ctx context.Context, in *gatewayapi.SolveRequest, opts ...grpc.CallOption) (*gatewayapi.SolveResponse, error) { + ctx = buildid.AppendToOutgoingContext(ctx, g.buildID) + return g.gateway.Solve(ctx, in, opts...) +} + +func (g *gatewayClientForBuild) ReadFile(ctx context.Context, in *gatewayapi.ReadFileRequest, opts ...grpc.CallOption) (*gatewayapi.ReadFileResponse, error) { + ctx = buildid.AppendToOutgoingContext(ctx, g.buildID) + return g.gateway.ReadFile(ctx, in, opts...) +} + +func (g *gatewayClientForBuild) Ping(ctx context.Context, in *gatewayapi.PingRequest, opts ...grpc.CallOption) (*gatewayapi.PongResponse, error) { + ctx = buildid.AppendToOutgoingContext(ctx, g.buildID) + return g.gateway.Ping(ctx, in, opts...) +} + +func (g *gatewayClientForBuild) Return(ctx context.Context, in *gatewayapi.ReturnRequest, opts ...grpc.CallOption) (*gatewayapi.ReturnResponse, error) { + ctx = buildid.AppendToOutgoingContext(ctx, g.buildID) + return g.gateway.Return(ctx, in, opts...) +} diff --git a/client/build_test.go b/client/build_test.go new file mode 100644 index 000000000000..58b3e166cbc6 --- /dev/null +++ b/client/build_test.go @@ -0,0 +1,178 @@ +package client + +import ( + "context" + "io/ioutil" + "os" + "path/filepath" + "testing" + + "github.com/moby/buildkit/client/llb" + "github.com/moby/buildkit/frontend/gateway/client" + gatewayapi "github.com/moby/buildkit/frontend/gateway/pb" + "github.com/moby/buildkit/identity" + "github.com/moby/buildkit/util/testutil/integration" + "github.com/pkg/errors" + "github.com/stretchr/testify/require" +) + +func TestClientGatewayIntegration(t *testing.T) { + integration.Run(t, []integration.Test{ + testClientGatewaySolve, + testClientGatewayFailedSolve, + testClientGatewayEmptySolve, + testNoBuildID, + testUnknownBuildID, + }) +} + +func testClientGatewaySolve(t *testing.T, sb integration.Sandbox) { + t.Parallel() + requiresLinux(t) + + ctx := context.TODO() + + c, err := New(ctx, sb.Address()) + require.NoError(t, err) + defer c.Close() + + product := "buildkit_test" + optKey := "test-string" + + b := func(ctx context.Context, c client.Client) (*client.Result, error) { + if c.BuildOpts().Product != product { + return nil, errors.Errorf("expected product %q, got %q", product, c.BuildOpts().Product) + } + opts := c.BuildOpts().Opts + testStr, ok := opts[optKey] + if !ok { + return nil, errors.Errorf(`build option %q missing`, optKey) + } + + run := llb.Image("busybox:latest").Run( + llb.ReadonlyRootFS(), + llb.Args([]string{"/bin/sh", "-ec", `echo -n '` + testStr + `' > /out/foo`}), + ) + st := run.AddMount("/out", llb.Scratch()) + + def, err := st.Marshal() + if err != nil { + return nil, errors.Wrap(err, "failed to marshal state") + } + + r, err := c.Solve(ctx, client.SolveRequest{ + Definition: def.ToPB(), + }) + if err != nil { + return nil, errors.Wrap(err, "failed to solve") + } + + read, err := r.Ref.ReadFile(ctx, client.ReadRequest{ + Filename: "/foo", + }) + if err != nil { + return nil, errors.Wrap(err, "failed to read result") + } + if testStr != string(read) { + return nil, errors.Errorf("read back %q, expected %q", string(read), testStr) + } + return r, nil + } + + tmpdir, err := ioutil.TempDir("", "buildkit") + require.NoError(t, err) + defer os.RemoveAll(tmpdir) + + testStr := "This is a test" + + _, err = c.Build(ctx, SolveOpt{ + Exporter: ExporterLocal, + ExporterOutputDir: tmpdir, + FrontendAttrs: map[string]string{ + optKey: testStr, + }, + }, product, b, nil) + require.NoError(t, err) + + read, err := ioutil.ReadFile(filepath.Join(tmpdir, "foo")) + require.NoError(t, err) + require.Equal(t, testStr, string(read)) + + checkAllReleasable(t, c, sb, true) +} + +func testClientGatewayFailedSolve(t *testing.T, sb integration.Sandbox) { + t.Parallel() + requiresLinux(t) + + ctx := context.TODO() + + c, err := New(ctx, sb.Address()) + require.NoError(t, err) + defer c.Close() + + b := func(ctx context.Context, c client.Client) (*client.Result, error) { + return nil, errors.New("expected to fail") + } + + _, err = c.Build(ctx, SolveOpt{}, "", b, nil) + require.Error(t, err) + require.Contains(t, err.Error(), "expected to fail") +} + +func testClientGatewayEmptySolve(t *testing.T, sb integration.Sandbox) { + t.Parallel() + requiresLinux(t) + + ctx := context.TODO() + + c, err := New(ctx, sb.Address()) + require.NoError(t, err) + defer c.Close() + + b := func(ctx context.Context, c client.Client) (*client.Result, error) { + r, err := c.Solve(ctx, client.SolveRequest{}) + if err != nil { + return nil, errors.Wrap(err, "failed to solve") + } + if r.Ref != nil || r.Refs != nil || r.Metadata != nil { + return nil, errors.Errorf("got unexpected non-empty result %+v", r) + } + return r, nil + } + + _, err = c.Build(ctx, SolveOpt{}, "", b, nil) + require.NoError(t, err) +} + +func testNoBuildID(t *testing.T, sb integration.Sandbox) { + t.Parallel() + requiresLinux(t) + + ctx := context.TODO() + + c, err := New(ctx, sb.Address()) + require.NoError(t, err) + defer c.Close() + + g := gatewayapi.NewLLBBridgeClient(c.conn) + _, err = g.Ping(ctx, &gatewayapi.PingRequest{}) + require.Error(t, err) + require.Contains(t, err.Error(), "no buildid found in context") +} + +func testUnknownBuildID(t *testing.T, sb integration.Sandbox) { + t.Parallel() + requiresLinux(t) + + ctx := context.TODO() + + c, err := New(ctx, sb.Address()) + require.NoError(t, err) + defer c.Close() + + g := c.gatewayClientForBuild(t.Name() + identity.NewID()) + _, err = g.Ping(ctx, &gatewayapi.PingRequest{}) + require.Error(t, err) + require.Contains(t, err.Error(), "no such job") +} diff --git a/client/buildid/metadata.go b/client/buildid/metadata.go new file mode 100644 index 000000000000..bb169b8fe4ec --- /dev/null +++ b/client/buildid/metadata.go @@ -0,0 +1,29 @@ +package buildid + +import ( + "context" + + "google.golang.org/grpc/metadata" +) + +var metadataKey = "buildkit-controlapi-buildid" + +func AppendToOutgoingContext(ctx context.Context, id string) context.Context { + if id != "" { + return metadata.AppendToOutgoingContext(ctx, metadataKey, id) + } + return ctx +} + +func FromIncomingContext(ctx context.Context) string { + md, ok := metadata.FromIncomingContext(ctx) + if !ok { + return "" + } + + if ids := md.Get(metadataKey); len(ids) == 1 { + return ids[0] + } + + return "" +} diff --git a/client/solve.go b/client/solve.go index 840ce5e83517..8e1ae4962d45 100644 --- a/client/solve.go +++ b/client/solve.go @@ -54,6 +54,16 @@ func (c *Client) Solve(ctx context.Context, def *llb.Definition, opt SolveOpt, s return nil, errors.Errorf("invalid definition for frontend %s", opt.Frontend) } + return c.solve(ctx, def, nil, opt, statusChan) +} + +type runGatewayCB func(ref string, s *session.Session) error + +func (c *Client) solve(ctx context.Context, def *llb.Definition, runGateway runGatewayCB, opt SolveOpt, statusChan chan *SolveStatus) (*SolveResponse, error) { + if def != nil && runGateway != nil { + return nil, errors.New("invalid with def and cb") + } + syncedDirs, err := prepareSyncedDirs(def, opt.LocalDirs) if err != nil { return nil, err @@ -112,8 +122,12 @@ func (c *Client) Solve(ctx context.Context, def *llb.Definition, opt SolveOpt, s return s.Run(statusContext, grpchijack.Dialer(c.controlClient())) }) + solveCtx, cancelSolve := context.WithCancel(ctx) var res *SolveResponse eg.Go(func() error { + ctx := solveCtx + defer cancelSolve() + defer func() { // make sure the Status ends cleanly on build errors go func() { <-time.After(3 * time.Second) @@ -150,6 +164,28 @@ func (c *Client) Solve(ctx context.Context, def *llb.Definition, opt SolveOpt, s return nil }) + if runGateway != nil { + eg.Go(func() error { + err := runGateway(ref, s) + if err == nil { + return nil + } + + // If the callback failed then the main + // `Solve` (called above) should error as + // well. However as a fallback we wait up to + // 5s for that to happen before failing this + // goroutine. + select { + case <-solveCtx.Done(): + case <-time.After(5 * time.Second): + cancelSolve() + } + + return err + }) + } + eg.Go(func() error { stream, err := c.controlClient().Status(statusContext, &controlapi.StatusRequest{ Ref: ref, diff --git a/control/control.go b/control/control.go index 1d598672edbf..0e6904dc8f00 100644 --- a/control/control.go +++ b/control/control.go @@ -9,6 +9,7 @@ import ( apitypes "github.com/moby/buildkit/api/types" "github.com/moby/buildkit/cache/remotecache" "github.com/moby/buildkit/client" + controlgateway "github.com/moby/buildkit/control/gateway" "github.com/moby/buildkit/exporter" "github.com/moby/buildkit/frontend" "github.com/moby/buildkit/session" @@ -35,29 +36,34 @@ type Opt struct { } type Controller struct { // TODO: ControlService - opt Opt - solver *llbsolver.Solver - cache solver.CacheManager + opt Opt + solver *llbsolver.Solver + cache solver.CacheManager + gatewayForwarder *controlgateway.GatewayForwarder } func NewController(opt Opt) (*Controller, error) { cache := solver.NewCacheManager("local", opt.CacheKeyStorage, worker.NewCacheResultStorage(opt.WorkerController)) - solver, err := llbsolver.New(opt.WorkerController, opt.Frontends, cache, opt.ResolveCacheImporterFunc) + gatewayForwarder := controlgateway.NewGatewayForwarder() + + solver, err := llbsolver.New(opt.WorkerController, opt.Frontends, cache, opt.ResolveCacheImporterFunc, gatewayForwarder) if err != nil { return nil, errors.Wrap(err, "failed to create solver") } c := &Controller{ - opt: opt, - solver: solver, - cache: cache, + opt: opt, + solver: solver, + cache: cache, + gatewayForwarder: gatewayForwarder, } return c, nil } func (c *Controller) Register(server *grpc.Server) error { controlapi.RegisterControlServer(server, c) + c.gatewayForwarder.Register(server) return nil } diff --git a/control/gateway/gateway.go b/control/gateway/gateway.go new file mode 100644 index 000000000000..475d18b4a4dd --- /dev/null +++ b/control/gateway/gateway.go @@ -0,0 +1,127 @@ +package gateway + +import ( + "context" + "sync" + "time" + + "github.com/moby/buildkit/client/buildid" + "github.com/moby/buildkit/frontend/gateway" + gwapi "github.com/moby/buildkit/frontend/gateway/pb" + "github.com/pkg/errors" + "google.golang.org/grpc" +) + +type GatewayForwarder struct { + mu sync.RWMutex + updateCond *sync.Cond + builds map[string]gateway.LLBBridgeForwarder +} + +func NewGatewayForwarder() *GatewayForwarder { + gwf := &GatewayForwarder{ + builds: map[string]gateway.LLBBridgeForwarder{}, + } + gwf.updateCond = sync.NewCond(gwf.mu.RLocker()) + return gwf +} + +func (gwf *GatewayForwarder) Register(server *grpc.Server) { + gwapi.RegisterLLBBridgeServer(server, gwf) +} + +func (gwf *GatewayForwarder) RegisterBuild(ctx context.Context, id string, bridge gateway.LLBBridgeForwarder) error { + gwf.mu.Lock() + defer gwf.mu.Unlock() + + if _, ok := gwf.builds[id]; ok { + return errors.Errorf("build ID %s exists", id) + } + + gwf.builds[id] = bridge + gwf.updateCond.Broadcast() + + return nil +} + +func (gwf *GatewayForwarder) UnregisterBuild(ctx context.Context, id string) { + gwf.mu.Lock() + defer gwf.mu.Unlock() + + delete(gwf.builds, id) + gwf.updateCond.Broadcast() +} + +func (gwf *GatewayForwarder) lookupForwarder(ctx context.Context) (gateway.LLBBridgeForwarder, error) { + bid := buildid.FromIncomingContext(ctx) + if bid == "" { + return nil, errors.New("no buildid found in context") + } + + ctx, cancel := context.WithTimeout(ctx, 3*time.Second) + defer cancel() + + go func() { + <-ctx.Done() + gwf.updateCond.Broadcast() + }() + + gwf.mu.RLock() + defer gwf.mu.RUnlock() + for { + select { + case <-ctx.Done(): + return nil, errors.Errorf("no such job %s", bid) + default: + } + fwd, ok := gwf.builds[bid] + if !ok { + gwf.updateCond.Wait() + continue + } + return fwd, nil + } +} + +func (gwf *GatewayForwarder) ResolveImageConfig(ctx context.Context, req *gwapi.ResolveImageConfigRequest) (*gwapi.ResolveImageConfigResponse, error) { + fwd, err := gwf.lookupForwarder(ctx) + if err != nil { + return nil, errors.Wrap(err, "forwarding ResolveImageConfig") + } + + return fwd.ResolveImageConfig(ctx, req) +} + +func (gwf *GatewayForwarder) Solve(ctx context.Context, req *gwapi.SolveRequest) (*gwapi.SolveResponse, error) { + fwd, err := gwf.lookupForwarder(ctx) + if err != nil { + return nil, errors.Wrap(err, "forwarding Solve") + } + + return fwd.Solve(ctx, req) +} + +func (gwf *GatewayForwarder) ReadFile(ctx context.Context, req *gwapi.ReadFileRequest) (*gwapi.ReadFileResponse, error) { + fwd, err := gwf.lookupForwarder(ctx) + if err != nil { + return nil, errors.Wrap(err, "forwarding ReadFile") + } + return fwd.ReadFile(ctx, req) +} + +func (gwf *GatewayForwarder) Ping(ctx context.Context, req *gwapi.PingRequest) (*gwapi.PongResponse, error) { + fwd, err := gwf.lookupForwarder(ctx) + if err != nil { + return nil, errors.Wrap(err, "forwarding Ping") + } + return fwd.Ping(ctx, req) +} + +func (gwf *GatewayForwarder) Return(ctx context.Context, req *gwapi.ReturnRequest) (*gwapi.ReturnResponse, error) { + fwd, err := gwf.lookupForwarder(ctx) + if err != nil { + return nil, errors.Wrap(err, "forwarding Return") + } + res, err := fwd.Return(ctx, req) + return res, err +} diff --git a/examples/build-using-dockerfile/main.go b/examples/build-using-dockerfile/main.go index fed7af59c9d0..c773e5b74dc5 100644 --- a/examples/build-using-dockerfile/main.go +++ b/examples/build-using-dockerfile/main.go @@ -11,6 +11,7 @@ import ( "github.com/containerd/console" "github.com/moby/buildkit/client" + dockerfile "github.com/moby/buildkit/frontend/dockerfile/builder" "github.com/moby/buildkit/util/appcontext" "github.com/moby/buildkit/util/appdefaults" "github.com/moby/buildkit/util/progress/progressui" @@ -39,6 +40,11 @@ By default, the built image is loaded to Docker. EnvVar: "BUILDKIT_HOST", Value: appdefaults.Address, }, + cli.BoolFlag{ + Name: "clientside-frontend", + Usage: "run dockerfile frontend client side, rather than builtin to buildkitd", + EnvVar: "BUILDKIT_CLIENTSIDE_FRONTEND", + }, } app.Flags = append([]cli.Flag{ cli.StringSliceFlag{ @@ -83,7 +89,12 @@ func action(clicontext *cli.Context) error { ch := make(chan *client.SolveStatus) eg, ctx := errgroup.WithContext(ctx) eg.Go(func() error { - _, err := c.Solve(ctx, nil, *solveOpt, ch) + var err error + if clicontext.Bool("clientside-frontend") { + _, err = c.Build(ctx, *solveOpt, "", dockerfile.Build, ch) + } else { + _, err = c.Solve(ctx, nil, *solveOpt, ch) + } return err }) eg.Go(func() error { @@ -124,6 +135,10 @@ func newSolveOpt(clicontext *cli.Context, w io.WriteCloser) (*client.SolveOpt, e "dockerfile": filepath.Dir(file), } + frontend := "dockerfile.v0" // TODO: use gateway + if clicontext.Bool("clientside-frontend") { + frontend = "" + } frontendAttrs := map[string]string{ "filename": filepath.Base(file), } @@ -145,7 +160,7 @@ func newSolveOpt(clicontext *cli.Context, w io.WriteCloser) (*client.SolveOpt, e }, ExporterOutput: w, LocalDirs: localDirs, - Frontend: "dockerfile.v0", // TODO: use gateway + Frontend: frontend, FrontendAttrs: frontendAttrs, }, nil } diff --git a/frontend/dockerfile/cmd/dockerfile-frontend/main.go b/frontend/dockerfile/cmd/dockerfile-frontend/main.go index 87921840ec40..8e2af51ff518 100644 --- a/frontend/dockerfile/cmd/dockerfile-frontend/main.go +++ b/frontend/dockerfile/cmd/dockerfile-frontend/main.go @@ -8,7 +8,7 @@ import ( ) func main() { - if err := grpcclient.Run(appcontext.Context(), dockerfile.Build); err != nil { + if err := grpcclient.RunFromEnvironment(appcontext.Context(), dockerfile.Build); err != nil { logrus.Errorf("fatal error: %+v", err) panic(err) } diff --git a/frontend/gateway/gateway.go b/frontend/gateway/gateway.go index 15aac8533148..dbe43d915578 100644 --- a/frontend/gateway/gateway.go +++ b/frontend/gateway/gateway.go @@ -215,26 +215,72 @@ func (gf *gatewayFrontend) Solve(ctx context.Context, llbBridge frontend.Fronten ReadonlyRootFS: readonly, }, rootFS, lbf.Stdin, lbf.Stdout, os.Stderr) - if lbf.err != nil { - return nil, lbf.err + if err != nil { + // An existing error (set via Return rpc) takes + // precedence over this error, which in turn takes + // precedence over a success reported via Return. + lbf.mu.Lock() + if lbf.err == nil { + lbf.result = nil + lbf.err = err + } + lbf.mu.Unlock() } - if err != nil { - return nil, err + return lbf.Result() +} + +func (lbf *llbBridgeForwarder) Done() <-chan struct{} { + return lbf.doneCh +} + +func (lbf *llbBridgeForwarder) setResult(r *frontend.Result, err error) (*pb.ReturnResponse, error) { + lbf.mu.Lock() + defer lbf.mu.Unlock() + + if (r == nil) == (err == nil) { + return nil, errors.New("gateway return must be either result or err") + } + + if lbf.result != nil || lbf.err != nil { + return nil, errors.New("gateway result is already set") + } + + lbf.result = r + lbf.err = err + close(lbf.doneCh) + return &pb.ReturnResponse{}, nil +} + +func (lbf *llbBridgeForwarder) Result() (*frontend.Result, error) { + lbf.mu.Lock() + defer lbf.mu.Unlock() + + if lbf.result == nil && lbf.err == nil { + return nil, errors.New("no result for incomplete build") + } + + if lbf.err != nil { + return nil, lbf.err } return lbf.result, nil } -func newLLBBridgeForwarder(ctx context.Context, llbBridge frontend.FrontendLLBBridge, workers frontend.WorkerInfos) (*llbBridgeForwarder, error) { +func NewBridgeForwarder(ctx context.Context, llbBridge frontend.FrontendLLBBridge, workers frontend.WorkerInfos) *llbBridgeForwarder { lbf := &llbBridgeForwarder{ callCtx: ctx, llbBridge: llbBridge, refs: map[string]solver.CachedResult{}, + doneCh: make(chan struct{}), pipe: newPipe(), workers: workers, } + return lbf +} +func newLLBBridgeForwarder(ctx context.Context, llbBridge frontend.FrontendLLBBridge, workers frontend.WorkerInfos) (*llbBridgeForwarder, error) { + lbf := NewBridgeForwarder(ctx, llbBridge, workers) server := grpc.NewServer() grpc_health_v1.RegisterHealthServer(server, health.NewServer()) pb.RegisterLLBBridgeServer(server, lbf) @@ -297,6 +343,12 @@ func (d dummyAddr) String() string { return "localhost" } +type LLBBridgeForwarder interface { + pb.LLBBridgeServer + Done() <-chan struct{} + Result() (*frontend.Result, error) +} + type llbBridgeForwarder struct { mu sync.Mutex callCtx context.Context @@ -305,6 +357,7 @@ type llbBridgeForwarder struct { // lastRef solver.CachedResult // lastRefs map[string]solver.CachedResult // err error + doneCh chan struct{} // closed when result or err become valid through a call to a Return result *frontend.Result err error exporterAttr map[string][]byte @@ -465,13 +518,13 @@ func (lbf *llbBridgeForwarder) Ping(context.Context, *pb.PingRequest) (*pb.PongR func (lbf *llbBridgeForwarder) Return(ctx context.Context, in *pb.ReturnRequest) (*pb.ReturnResponse, error) { if in.Error != nil { - lbf.err = status.ErrorProto(&spb.Status{ + return lbf.setResult(nil, status.ErrorProto(&spb.Status{ Code: in.Error.Code, Message: in.Error.Message, // Details: in.Error.Details, - }) + })) } else { - lbf.result = &frontend.Result{ + r := &frontend.Result{ Metadata: in.Result.Metadata, } @@ -481,7 +534,7 @@ func (lbf *llbBridgeForwarder) Return(ctx context.Context, in *pb.ReturnRequest) if err != nil { return nil, err } - lbf.result.Ref = ref + r.Ref = ref case *pb.Result_Refs: m := map[string]solver.CachedResult{} for k, v := range res.Refs.Refs { @@ -491,11 +544,10 @@ func (lbf *llbBridgeForwarder) Return(ctx context.Context, in *pb.ReturnRequest) } m[k] = ref } - lbf.result.Refs = m + r.Refs = m } + return lbf.setResult(r, nil) } - - return &pb.ReturnResponse{}, nil } func (lbf *llbBridgeForwarder) convertRef(id string) (solver.CachedResult, error) { diff --git a/frontend/gateway/grpcclient/client.go b/frontend/gateway/grpcclient/client.go index b7773cd18df1..deb81008b0b3 100644 --- a/frontend/gateway/grpcclient/client.go +++ b/frontend/gateway/grpcclient/client.go @@ -22,18 +22,11 @@ import ( const frontendPrefix = "BUILDKIT_FRONTEND_OPT_" -func current() (*grpcClient, error) { - if ep := product(); ep != "" { - apicaps.ExportedProduct = ep - } - - ctx, conn, err := grpcClientConn(context.Background()) - if err != nil { - return nil, err - } - - c := pb.NewLLBBridgeClient(conn) +type GrpcClient interface { + Run(context.Context, client.BuildFunc) error +} +func New(ctx context.Context, opts map[string]string, session, product string, c pb.LLBBridgeClient, w []client.WorkerInfo) (GrpcClient, error) { resp, err := c.Ping(ctx, &pb.PingRequest{}) if err != nil { return nil, err @@ -49,16 +42,29 @@ func current() (*grpcClient, error) { return &grpcClient{ client: c, - opts: opts(), - sessionID: sessionID(), - workers: workers(), - product: product(), + opts: opts, + sessionID: session, + workers: w, + product: product, caps: pb.Caps.CapSet(resp.FrontendAPICaps), llbCaps: opspb.Caps.CapSet(resp.LLBCaps), requests: map[string]*pb.SolveRequest{}, }, nil } +func current() (GrpcClient, error) { + if ep := product(); ep != "" { + apicaps.ExportedProduct = ep + } + + ctx, conn, err := grpcClientConn(context.Background()) + if err != nil { + return nil, err + } + + return New(ctx, opts(), sessionID(), product(), pb.NewLLBBridgeClient(conn), workers()) +} + func convertRef(ref client.Reference) (string, error) { if ref == nil { return "", nil @@ -70,20 +76,28 @@ func convertRef(ref client.Reference) (string, error) { return r.id, nil } -func Run(ctx context.Context, f client.BuildFunc) (retError error) { +func RunFromEnvironment(ctx context.Context, f client.BuildFunc) error { client, err := current() if err != nil { return errors.Wrapf(err, "failed to initialize client from environment") } + return client.Run(ctx, f) +} - res, err := f(ctx, client) - - export := client.caps.Supports(pb.CapReturnResult) == nil +func (c *grpcClient) Run(ctx context.Context, f client.BuildFunc) (retError error) { + export := c.caps.Supports(pb.CapReturnResult) == nil + var ( + res *client.Result + err error + ) if export { defer func() { req := &pb.ReturnRequest{} if retError == nil { + if res == nil { + res = &client.Result{} + } pbRes := &pb.Result{ Metadata: res.Metadata, } @@ -119,17 +133,17 @@ func Run(ctx context.Context, f client.BuildFunc) (retError error) { // Details: stp.Details, } } - if _, err := client.client.Return(ctx, req); err != nil && retError == nil { + if _, err := c.client.Return(ctx, req); err != nil && retError == nil { retError = err } }() } - if err != nil { + if res, err = f(ctx, c); err != nil { return err } - if err := client.caps.Supports(pb.CapReturnMap); len(res.Refs) > 1 && err != nil { + if err := c.caps.Supports(pb.CapReturnMap); len(res.Refs) > 1 && err != nil { return err } @@ -139,7 +153,7 @@ func Run(ctx context.Context, f client.BuildFunc) (retError error) { return errors.Wrapf(err, "failed to marshal return metadata") } - req, err := client.requestForRef(res.Ref) + req, err := c.requestForRef(res.Ref) if err != nil { return errors.Wrapf(err, "failed to find return ref") } @@ -147,7 +161,7 @@ func Run(ctx context.Context, f client.BuildFunc) (retError error) { req.Final = true req.ExporterAttr = exportedAttrBytes - if _, err := client.client.Solve(ctx, req); err != nil { + if _, err := c.client.Solve(ctx, req); err != nil { return errors.Wrapf(err, "failed to solve") } } @@ -235,10 +249,12 @@ func (c *grpcClient) requestForRef(ref client.Reference) (*pb.SolveRequest, erro } func (c *grpcClient) Solve(ctx context.Context, creq client.SolveRequest) (*client.Result, error) { - for _, md := range creq.Definition.Metadata { - for cap := range md.Caps { - if err := c.llbCaps.Supports(cap); err != nil { - return nil, err + if creq.Definition != nil { + for _, md := range creq.Definition.Metadata { + for cap := range md.Caps { + if err := c.llbCaps.Supports(cap); err != nil { + return nil, err + } } } } diff --git a/solver/llbsolver/solver.go b/solver/llbsolver/solver.go index a2a9f60365b8..464babaeb270 100644 --- a/solver/llbsolver/solver.go +++ b/solver/llbsolver/solver.go @@ -7,8 +7,10 @@ import ( "github.com/moby/buildkit/cache" "github.com/moby/buildkit/cache/remotecache" "github.com/moby/buildkit/client" + controlgateway "github.com/moby/buildkit/control/gateway" "github.com/moby/buildkit/exporter" "github.com/moby/buildkit/frontend" + "github.com/moby/buildkit/frontend/gateway" "github.com/moby/buildkit/identity" "github.com/moby/buildkit/session" "github.com/moby/buildkit/solver" @@ -32,18 +34,22 @@ type ExporterRequest struct { type ResolveWorkerFunc func() (worker.Worker, error) type Solver struct { + workerController *worker.Controller solver *solver.Solver resolveWorker ResolveWorkerFunc frontends map[string]frontend.Frontend resolveCacheImporter remotecache.ResolveCacheImporterFunc platforms []specs.Platform + gatewayForwarder *controlgateway.GatewayForwarder } -func New(wc *worker.Controller, f map[string]frontend.Frontend, cache solver.CacheManager, resolveCI remotecache.ResolveCacheImporterFunc) (*Solver, error) { +func New(wc *worker.Controller, f map[string]frontend.Frontend, cache solver.CacheManager, resolveCI remotecache.ResolveCacheImporterFunc, gatewayForwarder *controlgateway.GatewayForwarder) (*Solver, error) { s := &Solver{ + workerController: wc, resolveWorker: defaultResolver(wc), frontends: f, resolveCacheImporter: resolveCI, + gatewayForwarder: gatewayForwarder, } // executing is currently only allowed on default worker @@ -97,9 +103,29 @@ func (s *Solver) Solve(ctx context.Context, id string, req frontend.SolveRequest j.SessionID = session.FromContext(ctx) - res, err := s.Bridge(j).Solve(ctx, req) - if err != nil { - return nil, err + var res *frontend.Result + if s.gatewayForwarder != nil && req.Definition == nil && req.Frontend == "" { + fwd := gateway.NewBridgeForwarder(ctx, s.Bridge(j), s.workerController) + if err := s.gatewayForwarder.RegisterBuild(ctx, id, fwd); err != nil { + return nil, err + } + defer s.gatewayForwarder.UnregisterBuild(ctx, id) + + var err error + select { + case <-fwd.Done(): + res, err = fwd.Result() + case <-ctx.Done(): + err = ctx.Err() + } + if err != nil { + return nil, err + } + } else { + res, err = s.Bridge(j).Solve(ctx, req) + if err != nil { + return nil, err + } } defer func() {