Skip to content

Commit

Permalink
interp: make read cancellable via Run's ctx
Browse files Browse the repository at this point in the history
- Add a CancelableReader type, and wrap stdin with it at the top of
  Runner.Run.
- Also include CancelableReaderTTY which preserves stdin looking like a
  tty, i.e. [[ -t 0 ]] is still true.
  • Loading branch information
theclapp committed May 5, 2022
1 parent 43a1600 commit 0f22758
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 1 deletion.
5 changes: 5 additions & 0 deletions interp/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -528,6 +528,11 @@ func (r *Runner) Run(ctx context.Context, node syntax.Node) error {
if !r.didReset {
r.Reset()
}
if r.stdin != nil {
if _, ok := r.stdin.(Canceler); !ok {
r.stdin = NewCancelableReader(ctx, r.stdin)
}
}
r.fillExpandConfig(ctx)
r.err = nil
r.shellExited = false
Expand Down
112 changes: 112 additions & 0 deletions interp/reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package interp

import (
"context"
"io"
"sync"
)

var _ io.Reader = (*CancelableReader)(nil)
var _ io.Reader = (*CancelableReaderTTY)(nil)
var _ Canceler = (*CancelableReader)(nil)
var _ Canceler = (*CancelableReaderTTY)(nil)
var _ fder = (*CancelableReaderTTY)(nil)

type CancelableReader struct {
ctx context.Context
cancel context.CancelFunc
in chan []byte
out chan readResult
err error
r io.Reader
once sync.Once
}

type CancelableReaderTTY struct {
CancelableReader
}

type Canceler interface {
Cancel()
}

type readResult struct {
n int
err error
}

func NewCancelableReader(ctx context.Context, r io.Reader) io.Reader {
ctx, cancel := context.WithCancel(ctx)
c := CancelableReader{
r: r,
ctx: ctx,
cancel: cancel,
in: make(chan []byte),
out: make(chan readResult),
}
// Make sure [[ -t 0 ]] still works
if _, ok := r.(fder); ok {
return &CancelableReaderTTY{
CancelableReader: c,
}
}
return &c
}

// Read implements the io.Reader interface
func (c *CancelableReader) Read(p []byte) (int, error) {
c.once.Do(func() { go c.begin() })

// Send the buffer over to the reader goroutine
select {
case c.in <- p:
case <-c.ctx.Done():
return 0, c.ctx.Err()
}

// Get the output from the reader goroutine.
select {
case res, ok := <-c.out:
if !ok {
return 0, c.err
}
return res.n, res.err
case <-c.ctx.Done():
return 0, c.ctx.Err()
}
}

// Close implements the io.Closer interface.
func (c *CancelableReader) Close() error {
close(c.in)
if closer, ok := c.r.(io.Closer); ok {
return closer.Close()
}
return nil
}

func (c *CancelableReader) begin() {
for c.ctx.Err() == nil {
select {
case buf, ok := <-c.in:
if !ok {
return
}
n, err := c.r.Read(buf)
select {
case c.out <- readResult{n: n, err: err}:
case <-c.ctx.Done():
return
}
case <-c.ctx.Done():
}
}
}

func (c *CancelableReader) Cancel() {
c.cancel()
}

func (ct *CancelableReaderTTY) Fd() uintptr {
return ct.r.(fder).Fd()
}
6 changes: 5 additions & 1 deletion interp/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ import (
"mvdan.cc/sh/v3/syntax"
)

type fder interface {
Fd() uintptr
}

// non-empty string is true, empty string is false
func (r *Runner) bashTest(ctx context.Context, expr syntax.TestExpr, classic bool) string {
switch x := expr.(type) {
Expand Down Expand Up @@ -178,7 +182,7 @@ func (r *Runner) unTest(ctx context.Context, op syntax.UnTestOperator, x string)
case 2:
f = r.stderr
}
if f, ok := f.(interface{ Fd() uintptr }); ok {
if f, ok := f.(fder); ok {
// Support Fd methods such as the one on *os.File.
return term.IsTerminal(int(f.Fd()))
}
Expand Down

0 comments on commit 0f22758

Please sign in to comment.