Skip to content

Commit

Permalink
[supervisor] prevent slow clients to stale tasks in headless workspaces
Browse files Browse the repository at this point in the history
  • Loading branch information
akosyakov authored and roboquat committed Apr 15, 2022
1 parent 7a0529e commit 5d57819
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 12 deletions.
17 changes: 8 additions & 9 deletions components/supervisor/pkg/supervisor/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,12 +258,8 @@ func (tm *tasksManager) Run(ctx context.Context, wg *sync.WaitGroup, successChan
}
}
}
var readTimeout time.Duration
if !tm.config.isHeadless() {
readTimeout = 5 * time.Second
}
resp, err := tm.terminalService.OpenWithOptions(ctx, openRequest, terminal.TermOptions{
ReadTimeout: readTimeout,
ReadTimeout: 5 * time.Second,
Title: t.title,
})
if err != nil {
Expand Down Expand Up @@ -413,15 +409,18 @@ func prebuildLogFileName(task *task, storeLocation string) string {
return logs.PrebuildLogFileName(storeLocation, task.Id)
}

func (tm *tasksManager) watch(task *task, terminal *terminal.Term) {
func (tm *tasksManager) watch(task *task, term *terminal.Term) {
if !tm.config.isHeadless() {
return
}

var (
terminalLog = log.WithField("pid", terminal.Command.Process.Pid)
stdout = terminal.Stdout.Listen()
start = time.Now()
terminalLog = log.WithField("pid", term.Command.Process.Pid)
stdout = term.Stdout.ListenWithOptions(terminal.TermListenOptions{
// ensure logging of entire task output
ReadTimeout: terminal.NoTimeout,
})
start = time.Now()
)
go func() {
defer stdout.Close()
Expand Down
28 changes: 25 additions & 3 deletions components/supervisor/pkg/terminal/terminal.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ func newTerm(alias string, pty *os.File, cmd *exec.Cmd, options TermOptions) (*T

timeout := options.ReadTimeout
if timeout == 0 {
timeout = 1<<63 - 1
timeout = NoTimeout
}
res := &Term{
PTY: pty,
Expand Down Expand Up @@ -244,6 +244,9 @@ func newTerm(alias string, pty *os.File, cmd *exec.Cmd, options TermOptions) (*T
return res, nil
}

// NoTimeout means that listener can block read forever
var NoTimeout time.Duration = 1<<63 - 1

// TermOptions is a pseudo-terminal configuration.
type TermOptions struct {
// timeout after which a listener is dropped. Use 0 for no timeout.
Expand Down Expand Up @@ -375,6 +378,7 @@ var (

type multiWriterListener struct {
io.Reader
timeout time.Duration

closed bool
once sync.Once
Expand Down Expand Up @@ -413,22 +417,40 @@ func (closedTerminalListener) Read(p []byte) (n int, err error) {

var closedListener = io.NopCloser(closedTerminalListener{})

// TermListenOptions is a configuration to listen to the pseudo-terminal .
type TermListenOptions struct {
// timeout after which a listener is dropped. Use 0 for default timeout.
ReadTimeout time.Duration
}

// Listen listens in on the multi-writer stream.
func (mw *multiWriter) Listen() io.ReadCloser {
return mw.ListenWithOptions(TermListenOptions{
ReadTimeout: 0,
})
}

// Listen listens in on the multi-writer stream with given options.
func (mw *multiWriter) ListenWithOptions(options TermListenOptions) io.ReadCloser {
mw.mu.Lock()
defer mw.mu.Unlock()

if mw.closed {
return closedListener
}

timeout := options.ReadTimeout
if timeout == 0 {
timeout = mw.timeout
}
r, w := io.Pipe()
cchan, done, closeChan := make(chan []byte), make(chan struct{}, 1), make(chan struct{}, 1)
res := &multiWriterListener{
Reader: r,
cchan: cchan,
done: done,
closeChan: closeChan,
timeout: timeout,
}

recording := mw.recorder.Bytes()
Expand Down Expand Up @@ -489,13 +511,13 @@ func (mw *multiWriter) Write(p []byte) (n int, err error) {

select {
case lstr.cchan <- p:
case <-time.After(mw.timeout):
case <-time.After(lstr.timeout):
lstr.CloseWithError(ErrReadTimeout)
}

select {
case <-lstr.done:
case <-time.After(mw.timeout):
case <-time.After(lstr.timeout):
lstr.CloseWithError(ErrReadTimeout)
}
}
Expand Down

0 comments on commit 5d57819

Please sign in to comment.