diff --git a/commands/dap.go b/commands/dap.go index 2b975d000ed7..bdf0ef88f0ce 100644 --- a/commands/dap.go +++ b/commands/dap.go @@ -2,12 +2,17 @@ package commands import ( "context" + "io" + "net" + "os" + "github.com/containerd/console" "github.com/docker/buildx/dap" "github.com/docker/buildx/dap/common" "github.com/docker/buildx/util/cobrautil" "github.com/docker/buildx/util/ioset" "github.com/docker/buildx/util/progress" + "github.com/docker/cli/cli" "github.com/docker/cli/cli/command" "github.com/pkg/errors" "github.com/spf13/cobra" @@ -24,6 +29,8 @@ func dapCmd(dockerCli command.Cli, rootOpts *rootOptions) *cobra.Command { dapBuildCmd := buildCmd(dockerCli, rootOpts, &options) dapBuildCmd.Args = cobra.RangeArgs(0, 1) cmd.AddCommand(dapBuildCmd) + + cmd.AddCommand(dapAttachCmd()) return cmd } @@ -71,3 +78,39 @@ func (d *adapterProtocolDebugger) Stop() error { defer d.conn.Close() return d.Adapter.Stop() } + +func dapAttachCmd() *cobra.Command { + cmd := &cobra.Command{ + Use: "attach PATH", + Short: "Attach to a container created by the dap evaluate request", + Args: cli.ExactArgs(1), + Hidden: true, + RunE: func(cmd *cobra.Command, args []string) error { + c, err := console.ConsoleFromFile(os.Stdout) + if err != nil { + return err + } + + if err := c.SetRaw(); err != nil { + return err + } + + conn, err := net.Dial("unix", args[0]) + if err != nil { + return err + } + + fwd := ioset.NewSingleForwarder() + fwd.SetReader(os.Stdin) + fwd.SetWriter(conn, func() io.WriteCloser { + return conn + }) + + if _, err := io.Copy(os.Stdout, conn); err != nil && !errors.Is(err, io.EOF) { + return err + } + return nil + }, + } + return cmd +} diff --git a/dap/adapter.go b/dap/adapter.go index 5e6d954e828b..be1e17423b40 100644 --- a/dap/adapter.go +++ b/dap/adapter.go @@ -30,6 +30,7 @@ type Adapter[C LaunchConfig] struct { initialized chan struct{} started chan launchResponse[C] configuration chan struct{} + supportsExec bool evaluateReqCh chan *evaluateRequest @@ -104,6 +105,9 @@ func (d *Adapter[C]) Stop() error { func (d *Adapter[C]) Initialize(c Context, req *dap.InitializeRequest, resp *dap.InitializeResponse) error { close(d.initialized) + // Set parameters based on passed client capabilities. + d.supportsExec = req.Arguments.SupportsRunInTerminalRequest + // Set capabilities. resp.Body.SupportsConfigurationDoneRequest = true return nil @@ -262,6 +266,20 @@ func (d *Adapter[C]) getThread(id int) (t *thread) { return t } +func (d *Adapter[C]) getFirstThread() (t *thread) { + d.threadsMu.Lock() + defer d.threadsMu.Unlock() + + for _, thread := range d.threads { + if thread.isPaused() { + if t == nil || thread.id < t.id { + t = thread + } + } + } + return t +} + func (d *Adapter[C]) deleteThread(ctx Context, t *thread) { d.threadsMu.Lock() if t := d.threads[t.id]; t != nil { @@ -454,6 +472,7 @@ func (d *Adapter[C]) dapHandler() Handler { StackTrace: d.StackTrace, Scopes: d.Scopes, Variables: d.Variables, + Evaluate: d.Evaluate, Source: d.Source, } } diff --git a/dap/eval.go b/dap/eval.go new file mode 100644 index 000000000000..0ed410b61b14 --- /dev/null +++ b/dap/eval.go @@ -0,0 +1,153 @@ +package dap + +import ( + "context" + "fmt" + "net" + "os" + "path/filepath" + + "github.com/docker/buildx/build" + "github.com/docker/cli/cli-plugins/metadata" + "github.com/google/go-dap" + "github.com/google/shlex" + "github.com/pkg/errors" + "github.com/spf13/cobra" +) + +func (d *Adapter[C]) Evaluate(ctx Context, req *dap.EvaluateRequest, resp *dap.EvaluateResponse) error { + if req.Arguments.Context != "repl" { + return errors.Errorf("unsupported evaluate context: %s", req.Arguments.Context) + } + + args, err := shlex.Split(req.Arguments.Expression) + if err != nil { + return errors.Wrapf(err, "cannot parse expression") + } + + if len(args) == 0 { + return nil + } + + var t *thread + if req.Arguments.FrameId > 0 { + if t = d.getThreadByFrameID(req.Arguments.FrameId); t == nil { + return errors.Errorf("no thread with frame id %d", req.Arguments.FrameId) + } + } else { + if t = d.getFirstThread(); t == nil { + return errors.New("no paused thread") + } + } + + cmd := d.replCommands(ctx, t, resp) + cmd.SetArgs(args) + cmd.SetErr(d.Out()) + if err := cmd.Execute(); err != nil { + fmt.Fprintf(d.Out(), "ERROR: %+v\n", err) + } + return nil +} + +func (d *Adapter[C]) replCommands(ctx Context, t *thread, resp *dap.EvaluateResponse) *cobra.Command { + rootCmd := &cobra.Command{} + + execCmd := &cobra.Command{ + Use: "exec", + RunE: func(cmd *cobra.Command, args []string) error { + if !d.supportsExec { + return errors.New("cannot exec without runInTerminal client capability") + } + return t.Exec(ctx, args, resp) + }, + } + rootCmd.AddCommand(execCmd) + return rootCmd +} + +func (t *thread) Exec(ctx Context, args []string, eresp *dap.EvaluateResponse) (retErr error) { + cfg := &build.InvokeConfig{Tty: true} + if len(cfg.Entrypoint) == 0 && len(cfg.Cmd) == 0 { + cfg.Entrypoint = []string{"/bin/sh"} // launch shell by default + cfg.Cmd = []string{} + cfg.NoCmd = false + } + + ctr, err := build.NewContainer(ctx, t.rCtx, cfg) + if err != nil { + return err + } + defer func() { + if retErr != nil { + ctr.Cancel() + } + }() + + dir, err := os.MkdirTemp("", "buildx-dap-exec") + if err != nil { + return err + } + defer func() { + if retErr != nil { + os.RemoveAll(dir) + } + }() + + socketPath := filepath.Join(dir, "s.sock") + l, err := net.Listen("unix", socketPath) + if err != nil { + return err + } + + go func() { + defer os.RemoveAll(dir) + t.runExec(l, ctr, cfg) + }() + + // TODO: this should work in standalone mode too. + docker := os.Getenv(metadata.ReexecEnvvar) + req := &dap.RunInTerminalRequest{ + Request: dap.Request{ + Command: "runInTerminal", + }, + Arguments: dap.RunInTerminalRequestArguments{ + Kind: "integrated", + Args: []string{docker, "buildx", "dap", "attach", socketPath}, + Env: map[string]any{ + "BUILDX_EXPERIMENTAL": "1", + }, + }, + } + + resp := ctx.Request(req) + if !resp.GetResponse().Success { + return errors.New(resp.GetResponse().Message) + } + + eresp.Body.Result = fmt.Sprintf("Started process attached to %s.", socketPath) + return nil +} + +func (t *thread) runExec(l net.Listener, ctr *build.Container, cfg *build.InvokeConfig) { + defer l.Close() + defer ctr.Cancel() + + conn, err := l.Accept() + if err != nil { + return + } + defer conn.Close() + + // start a background goroutine to politely refuse any subsequent connections. + go func() { + for { + conn, err := l.Accept() + if err != nil { + return + } + fmt.Fprint(conn, "Error: Already connected to exec instance.") + conn.Close() + } + }() + ctr.Exec(context.Background(), cfg, conn, conn, conn) +} diff --git a/dap/handler.go b/dap/handler.go index 798205a42de0..f5b1d5f71cc6 100644 --- a/dap/handler.go +++ b/dap/handler.go @@ -12,6 +12,7 @@ type Context interface { context.Context C() chan<- dap.Message Go(f func(c Context)) bool + Request(req dap.RequestMessage) dap.ResponseMessage } type dispatchContext struct { @@ -28,6 +29,14 @@ func (c *dispatchContext) Go(f func(c Context)) bool { return c.srv.Go(f) } +func (c *dispatchContext) Request(req dap.RequestMessage) dap.ResponseMessage { + respCh := make(chan dap.ResponseMessage, 1) + c.srv.doRequest(c, req, func(c Context, resp dap.ResponseMessage) { + respCh <- resp + }) + return <-respCh +} + type HandlerFunc[Req dap.RequestMessage, Resp dap.ResponseMessage] func(c Context, req Req, resp Resp) error func (h HandlerFunc[Req, Resp]) Do(c Context, req Req) (resp Resp, err error) { diff --git a/dap/server.go b/dap/server.go index b57d30b0c7e7..63ba606c4429 100644 --- a/dap/server.go +++ b/dap/server.go @@ -3,6 +3,7 @@ package dap import ( "context" "sync" + "sync/atomic" "github.com/google/go-dap" "github.com/pkg/errors" @@ -11,6 +12,8 @@ import ( var ErrServerStopped = errors.New("dap: server stopped") +type RequestCallback func(c Context, resp dap.ResponseMessage) + type Server struct { h Handler @@ -21,6 +24,8 @@ type Server struct { ctx context.Context cancel context.CancelCauseFunc + seq atomic.Int64 + requests sync.Map initialized bool } @@ -73,14 +78,18 @@ func (s *Server) readLoop(conn Conn) error { switch m := m.(type) { case dap.RequestMessage: - if ok := s.dispatch(m); !ok { + if ok := s.dispatchRequest(m); !ok { + return nil + } + case dap.ResponseMessage: + if ok := s.dispatchResponse(m); !ok { return nil } } } } -func (s *Server) dispatch(m dap.RequestMessage) bool { +func (s *Server) dispatchRequest(m dap.RequestMessage) bool { fn := func(c Context) { rmsg, err := s.handleMessage(c, m) if err != nil { @@ -95,6 +104,19 @@ func (s *Server) dispatch(m dap.RequestMessage) bool { return s.Go(fn) } +func (s *Server) dispatchResponse(m dap.ResponseMessage) bool { + fn := func(c Context) { + reqID := m.GetResponse().RequestSeq + if v, loaded := s.requests.LoadAndDelete(reqID); loaded { + callback := v.(RequestCallback) + s.Go(func(c Context) { + callback(c, m) + }) + } + } + return s.Go(fn) +} + func (s *Server) handleMessage(c Context, m dap.Message) (dap.ResponseMessage, error) { switch req := m.(type) { case *dap.InitializeRequest: @@ -156,20 +178,24 @@ func (s *Server) handleInitialize(c Context, req *dap.InitializeRequest) (*dap.I } func (s *Server) writeLoop(conn Conn, respCh <-chan dap.Message) error { - var seq int for m := range respCh { switch m := m.(type) { case dap.RequestMessage: - m.GetRequest().Seq = seq + if req := m.GetRequest(); req.Seq == 0 { + req.Seq = int(s.seq.Add(1)) + } m.GetRequest().Type = "request" case dap.EventMessage: - m.GetEvent().Seq = seq + if event := m.GetEvent(); event.Seq == 0 { + event.Seq = int(s.seq.Add(1)) + } m.GetEvent().Type = "event" case dap.ResponseMessage: - m.GetResponse().Seq = seq + if resp := m.GetResponse(); resp.Seq == 0 { + resp.Seq = int(s.seq.Add(1)) + } m.GetResponse().Type = "response" } - seq++ if err := conn.SendMsg(m); err != nil { return err @@ -209,6 +235,12 @@ func (s *Server) Go(fn func(c Context)) bool { return <-started } +func (s *Server) doRequest(c Context, req dap.RequestMessage, callback RequestCallback) { + req.GetRequest().Seq = int(s.seq.Add(1)) + s.requests.Store(req.GetRequest().Seq, callback) + c.C() <- req +} + func (s *Server) Stop() { s.mu.Lock() s.ch = nil diff --git a/dap/thread.go b/dap/thread.go index b2ba5cb08ffe..477287fd182f 100644 --- a/dap/thread.go +++ b/dap/thread.go @@ -85,7 +85,7 @@ func (t *thread) Evaluate(ctx Context, c gateway.Client, ref gateway.Reference, if step == stepContinue { t.setBreakpoints(ctx) } - pos, err := t.seekNext(ctx, step) + ref, pos, err := t.seekNext(ctx, step) event := t.needsDebug(pos, step, err) if event.Reason == "" { @@ -93,7 +93,7 @@ func (t *thread) Evaluate(ctx Context, c gateway.Client, ref gateway.Reference, } select { - case step = <-t.pause(ctx, err, event): + case step = <-t.pause(ctx, ref, err, event): if err != nil { return err } @@ -139,7 +139,7 @@ func (t *thread) needsDebug(target digest.Digest, step stepType, err error) (e d return } -func (t *thread) pause(c Context, err error, event dap.StoppedEventBody) <-chan stepType { +func (t *thread) pause(c Context, ref gateway.Reference, err error, event dap.StoppedEventBody) <-chan stepType { t.mu.Lock() defer t.mu.Unlock() @@ -148,7 +148,7 @@ func (t *thread) pause(c Context, err error, event dap.StoppedEventBody) <-chan } t.paused = make(chan stepType, 1) - t.rCtx = build.NewResultHandle(c, t.c, t.ref, t.meta, err) + t.rCtx = build.NewResultHandle(c, t.c, ref, t.meta, err) if err != nil { var solveErr *errdefs.SolveError if errors.As(err, &solveErr) { @@ -189,6 +189,13 @@ func (t *thread) resume(step stepType) { t.paused = nil } +func (t *thread) isPaused() bool { + t.mu.Lock() + defer t.mu.Unlock() + + return t.paused != nil +} + func (t *thread) StackTrace() []dap.StackFrame { t.mu.Lock() defer t.mu.Unlock() @@ -373,11 +380,11 @@ func (t *thread) propagateRegionDependencies() { } } -func (t *thread) seekNext(ctx Context, step stepType) (digest.Digest, error) { +func (t *thread) seekNext(ctx Context, step stepType) (gateway.Reference, digest.Digest, error) { // If we're at the end, return no digest to signal that // we should conclude debugging. if t.curPos == t.head { - return "", nil + return nil, "", nil } target := t.head @@ -389,15 +396,15 @@ func (t *thread) seekNext(ctx Context, step stepType) (digest.Digest, error) { } if target == "" { - return "", nil + return nil, "", nil } return t.seek(ctx, target) } -func (t *thread) seek(ctx Context, target digest.Digest) (digest.Digest, error) { +func (t *thread) seek(ctx Context, target digest.Digest) (gateway.Reference, digest.Digest, error) { ref, err := t.solve(ctx, target) if err != nil { - return "", err + return ref, "", err } if err = ref.Evaluate(ctx); err != nil { @@ -412,7 +419,7 @@ func (t *thread) seek(ctx Context, target digest.Digest) (digest.Digest, error) } else { t.curPos = target } - return t.curPos, err + return ref, t.curPos, err } func (t *thread) nextDigest(fn func(digest.Digest) bool) digest.Digest {