diff --git a/analyzer/processes.go b/analyzer/processes.go index 26a0ba60..75f29903 100644 --- a/analyzer/processes.go +++ b/analyzer/processes.go @@ -12,7 +12,6 @@ import ( "path/filepath" "strconv" "strings" - "sync" "sync/atomic" "github.com/brimdata/zed/zio" @@ -31,12 +30,12 @@ func runProcesses(ctx context.Context, r io.Reader, confs ...Config) (*operation var writers []io.Writer group, ctx := errgroup.WithContext(ctx) for _, conf := range confs { - cmd, writer, err := command(conf) + cmd, err := command(conf) if err != nil { return nil, err } group.Go(cmd.Run) - writers = append(writers, writer) + writers = append(writers, cmd) } writeCounter := new(writeCounter) writers = append(writers, writeCounter) @@ -47,12 +46,6 @@ func runProcesses(ctx context.Context, r io.Reader, confs ...Config) (*operation closer.Close() } } - // Broken pipe errors and ErrClose are a result of a process - // shutting down. Return nil here since the process errors are - // more interesting. - if isPipe(err) || errors.Is(err, fs.ErrClosed) { - err = nil - } return err }) return &operation{ @@ -61,29 +54,47 @@ func runProcesses(ctx context.Context, r io.Reader, confs ...Config) (*operation }, nil } -func command(conf Config) (*wrappedCmd, io.WriteCloser, error) { +func command(conf Config) (*wrappedCmd, error) { cmd := exec.Command(conf.Cmd, conf.Args...) cmd.Dir = conf.WorkDir pw, err := cmd.StdinPipe() if err != nil { - return nil, nil, err + return nil, err } return &wrappedCmd{ Cmd: cmd, stderrPath: conf.StderrPath, stderrSaver: &prefixSuffixSaver{N: 32 << 10}, + stdinWriter: pw, stdoutPath: conf.StdoutPath, stdoutSaver: &prefixSuffixSaver{N: 32 << 10}, - }, pw, nil + }, nil } type wrappedCmd struct { *exec.Cmd + stdinWriter io.WriteCloser stdoutPath string stdoutSaver *prefixSuffixSaver stderrPath string stderrSaver *prefixSuffixSaver - wg sync.WaitGroup +} + +func (c *wrappedCmd) Write(b []byte) (int, error) { + n, err := c.stdinWriter.Write(b) + // Broken pipe errors and ErrClose are a result of a process + // shutting down. Since this maybe a case of the process legitimately + // exiting without needing to read all data, we ignore these errors + // and pretend the write was successful so as not to hold up data getting + // sent to other processes. + if isPipe(err) || errors.Is(err, fs.ErrClosed) { + return len(b), nil + } + return n, err +} + +func (c *wrappedCmd) Close() error { + return c.stdinWriter.Close() } func (c *wrappedCmd) Run() error { diff --git a/cmd/brimcap/ztests/analyze-process-readall.yaml b/cmd/brimcap/ztests/analyze-process-readall.yaml new file mode 100644 index 00000000..e684d1a4 --- /dev/null +++ b/cmd/brimcap/ztests/analyze-process-readall.yaml @@ -0,0 +1,36 @@ +script: | + mkdir wd1; mv noread.sh wd1 + mkdir wd2; mv readall.sh wd2 + bash gen.sh | brimcap analyze -config=config.yaml - | zq -z 'count()' - + +inputs: + - name: alerts.pcap + - name: config.yaml + data: | + analyzers: + - cmd: bash + args: [noread.sh] + name: noread + globs: ["*.json"] # so ztail will not try to read all the other schtuff + workdir: wd1 + - cmd: bash + args: [readall.sh] + name: readall + globs: ["*.zson"] + workdir: wd2 + - name: gen.sh + data: | + for i in {1..10000}; do + echo "{x: $i}" + done + - name: noread.sh + data: | + exit 0 + - name: readall.sh + data: | + cat /dev/stdin > readall.zson + +outputs: + - name: stdout + data: | + 10000(uint64)