Skip to content

Commit

Permalink
Fix race condition in log monitoring
Browse files Browse the repository at this point in the history
For some reason we were using os.Pipe instead of io.Pipe for an
in-process pipe. I've switched those over to using io.Pipe so we can
CloseWithError.

Also, switched to using errgroup.Group instead of manual channel
signaling.

Now, when we hit an error, we actually log that error instead of the
line just before that error.

Signed-off-by: Jon Johnson <jon.johnson@chainguard.dev>
  • Loading branch information
jonjohnsonjr committed Feb 5, 2024
1 parent c222eaa commit 855a27c
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 58 deletions.
40 changes: 18 additions & 22 deletions pkg/container/docker_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@ package container

import (
"context"
"errors"
"fmt"
"io"
"log/slog"
"os"

"go.opentelemetry.io/otel"
"golang.org/x/sync/errgroup"

apko_build "chainguard.dev/apko/pkg/build"
apko_oci "chainguard.dev/apko/pkg/build/oci"
Expand Down Expand Up @@ -181,30 +182,25 @@ func (dk *docker) TempDir() string {

// waitForCommand waits for a command to complete in the pod.
func (dk *docker) waitForCommand(cfg *Config, ctx context.Context, attachResp types.HijackedResponse, taskIDResp types.IDResponse) error {
stdoutPipeR, stdoutPipeW, err := os.Pipe()
if err != nil {
return err
}

stderrPipeR, stderrPipeW, err := os.Pipe()
if err != nil {
return err
}

finishStdout := make(chan struct{})
finishStderr := make(chan struct{})

go monitorPipe(ctx, slog.LevelInfo, stdoutPipeR, finishStdout)
go monitorPipe(ctx, slog.LevelWarn, stderrPipeR, finishStderr)
_, err = stdcopy.StdCopy(stdoutPipeW, stderrPipeW, attachResp.Reader)
// log := clog.FromContext(ctx)
ctx, span := otel.Tracer("melange").Start(ctx, "waitForCommand")
defer span.End()

stdoutPipeW.Close()
stderrPipeW.Close()
stdoutPipeR, stdoutPipeW := io.Pipe()
stderrPipeR, stderrPipeW := io.Pipe()

<-finishStdout
<-finishStderr
var g errgroup.Group
g.Go(func() error {
return monitorPipe(ctx, slog.LevelInfo, stdoutPipeR)
})
g.Go(func() error {
return monitorPipe(ctx, slog.LevelWarn, stderrPipeR)
})
_, err := stdcopy.StdCopy(stdoutPipeW, stderrPipeW, attachResp.Reader)
stdoutPipeW.CloseWithError(err)
stderrPipeW.CloseWithError(err)

return err
return errors.Join(err, g.Wait())
}

// Run runs a Docker task given a Config and command string.
Expand Down
37 changes: 17 additions & 20 deletions pkg/container/kubernetes_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/imdario/mergo"
"github.com/kelseyhightower/envconfig"
"go.opentelemetry.io/otel"
"golang.org/x/sync/errgroup"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
Expand Down Expand Up @@ -171,32 +172,28 @@ func (k *k8s) Run(ctx context.Context, cfg *Config, cmd ...string) error {
return fmt.Errorf("pod isn't running")
}

stdoutPipeR, stdoutPipeW, err := os.Pipe()
if err != nil {
return err
}
stderrPipeR, stderrPipeW, err := os.Pipe()
if err != nil {
return err
}
finishStdout := make(chan struct{})
finishStderr := make(chan struct{})
stdoutPipeR, stdoutPipeW := io.Pipe()
stderrPipeR, stderrPipeW := io.Pipe()

go monitorPipe(ctx, slog.LevelInfo, stdoutPipeR, finishStdout)
go monitorPipe(ctx, slog.LevelWarn, stderrPipeR, finishStderr)
var g errgroup.Group
g.Go(func() error {
return monitorPipe(ctx, slog.LevelInfo, stdoutPipeR)
})
g.Go(func() error {
return monitorPipe(ctx, slog.LevelWarn, stderrPipeR)
})

if err := k.Exec(ctx, cfg.PodID, cmd, remotecommand.StreamOptions{
err := k.Exec(ctx, cfg.PodID, cmd, remotecommand.StreamOptions{
Stdout: stdoutPipeW,
Stderr: stderrPipeW,
}); err != nil {
return fmt.Errorf("running remote command: %w", err)
}
})

stdoutPipeW.Close()
stderrPipeW.Close()
stdoutPipeW.CloseWithError(err)
stderrPipeW.CloseWithError(err)

<-finishStdout
<-finishStderr
if err != nil {
return fmt.Errorf("running remote command: %w", err)
}
return nil
}

Expand Down
5 changes: 2 additions & 3 deletions pkg/container/monitor_pipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,8 @@ import (
"github.com/chainguard-dev/clog"
)

func monitorPipe(ctx context.Context, level slog.Level, pipe io.ReadCloser, finish chan struct{}) {
func monitorPipe(ctx context.Context, level slog.Level, pipe io.Reader) error {
log := clog.FromContext(ctx)
defer pipe.Close()

scanner := bufio.NewScanner(pipe)
for scanner.Scan() {
Expand All @@ -38,5 +37,5 @@ func monitorPipe(ctx context.Context, level slog.Level, pipe io.ReadCloser, fini
}
}

finish <- struct{}{}
return scanner.Err()
}
23 changes: 10 additions & 13 deletions pkg/container/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package container

import (
"context"
"errors"
"fmt"
"io"
"log/slog"
Expand All @@ -24,6 +25,7 @@ import (
apko_build "chainguard.dev/apko/pkg/build"
apko_types "chainguard.dev/apko/pkg/build/types"
v1 "github.com/google/go-containerregistry/pkg/v1"
"golang.org/x/sync/errgroup"
)

type Runner interface {
Expand Down Expand Up @@ -78,18 +80,13 @@ func monitorCmd(ctx context.Context, cfg *Config, cmd *exec.Cmd) error {
return err
}

finishStdout := make(chan struct{})
finishStderr := make(chan struct{})
var g errgroup.Group
g.Go(func() error {
return monitorPipe(ctx, slog.LevelInfo, stdout)
})
g.Go(func() error {
return monitorPipe(ctx, slog.LevelWarn, stderr)
})

go monitorPipe(ctx, slog.LevelInfo, stdout, finishStdout)
go monitorPipe(ctx, slog.LevelWarn, stderr, finishStderr)

if err := cmd.Wait(); err != nil {
return err
}

<-finishStdout
<-finishStderr

return nil
return errors.Join(g.Wait(), cmd.Wait())
}

0 comments on commit 855a27c

Please sign in to comment.