Skip to content

Commit

Permalink
analyzer: Have exited processes read all data
Browse files Browse the repository at this point in the history
This commit changes the behavior for analyzer processes so that
processes that have successfully exited without reading all the data
will continue to consume data from the byte stream insteading of
returning an error and putting a stop to the copy goroutine.

Closes #331
  • Loading branch information
mattnibs committed Jan 25, 2024
1 parent 23d2b6d commit 39e916b
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 15 deletions.
40 changes: 25 additions & 15 deletions analyzer/processes.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"path/filepath"
"strconv"
"strings"
"sync"
"sync/atomic"

"github.com/brimdata/zed/zio"
Expand All @@ -31,12 +30,12 @@ func runProcesses(ctx context.Context, r io.Reader, confs ...Config) (*operation
var writers []io.Writer
group, ctx := errgroup.WithContext(ctx)
for _, conf := range confs {
cmd, writer, err := command(conf)
cmd, err := command(conf)
if err != nil {
return nil, err
}
group.Go(cmd.Run)
writers = append(writers, writer)
writers = append(writers, cmd)
}
writeCounter := new(writeCounter)
writers = append(writers, writeCounter)
Expand All @@ -47,12 +46,6 @@ func runProcesses(ctx context.Context, r io.Reader, confs ...Config) (*operation
closer.Close()
}
}
// Broken pipe errors and ErrClose are a result of a process
// shutting down. Return nil here since the process errors are
// more interesting.
if isPipe(err) || errors.Is(err, fs.ErrClosed) {
err = nil
}
return err
})
return &operation{
Expand All @@ -61,29 +54,46 @@ func runProcesses(ctx context.Context, r io.Reader, confs ...Config) (*operation
}, nil
}

func command(conf Config) (*wrappedCmd, io.WriteCloser, error) {
func command(conf Config) (*wrappedCmd, error) {
cmd := exec.Command(conf.Cmd, conf.Args...)
cmd.Dir = conf.WorkDir
pw, err := cmd.StdinPipe()
if err != nil {
return nil, nil, err
return nil, err
}
return &wrappedCmd{
Cmd: cmd,
stderrPath: conf.StderrPath,
stderrSaver: &prefixSuffixSaver{N: 32 << 10},
stdinWriter: pw,
stdoutPath: conf.StdoutPath,
stdoutSaver: &prefixSuffixSaver{N: 32 << 10},
}, pw, nil
}, nil
}

type wrappedCmd struct {
*exec.Cmd
stdoutPath string
stdoutSaver *prefixSuffixSaver
stderrPath string
stderrSaver *prefixSuffixSaver
wg sync.WaitGroup
stdinWriter io.WriteCloser
stdoutPath string
stdoutSaver *prefixSuffixSaver
}

func (c *wrappedCmd) Write(b []byte) (int, error) {
n, err := c.stdinWriter.Write(b)
// Broken pipe errors and ErrClose are a result of a process shutting down.
// Since this may be a case of the process legitimately exiting without
// reading all data, we ignore these errors and pretend the write was
// successful so as not to hold up data getting sent to other processes.
if isPipe(err) || errors.Is(err, fs.ErrClosed) {
return len(b), nil
}
return n, err
}

func (c *wrappedCmd) Close() error {
return c.stdinWriter.Close()
}

func (c *wrappedCmd) Run() error {
Expand Down
36 changes: 36 additions & 0 deletions cmd/brimcap/ztests/analyze-process-readall.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
script: |
mkdir wd1; mv noread.sh wd1
mkdir wd2; mv readall.sh wd2
bash gen.sh | brimcap analyze -config=config.yaml - | zq -z 'count()' -
inputs:
- name: alerts.pcap
- name: config.yaml
data: |
analyzers:
- cmd: bash
args: [noread.sh]
name: noread
globs: ["*.json"] # so ztail will not try to read all noread.sh
workdir: wd1
- cmd: bash
args: [readall.sh]
name: readall
globs: ["*.zson"]
workdir: wd2
- name: gen.sh
data: |
for i in {1..10000}; do
echo "{x: $i}"
done
- name: noread.sh
data: |
exit 0
- name: readall.sh
data: |
cat /dev/stdin > readall.zson
outputs:
- name: stdout
data: |
10000(uint64)

0 comments on commit 39e916b

Please sign in to comment.