Skip to content

Commit

Permalink
Send scanner stats back in search (#675)
Browse files Browse the repository at this point in the history
* Send scanner stats back in search

This commit tracks scanner stats in scanner.Scanner, and wires things
up so that they get sent back via the driver.Driver.
  • Loading branch information
henridf authored Apr 29, 2020
1 parent c355a51 commit e40934a
Show file tree
Hide file tree
Showing 9 changed files with 107 additions and 41 deletions.
12 changes: 6 additions & 6 deletions driver/compile.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func Compile(ctx context.Context, program ast.Proc, reader zbuf.Reader, reverse
func CompileWarningsCh(ctx context.Context, program ast.Proc, reader zbuf.Reader, reverse bool, span nano.Span, logger *zap.Logger, ch chan string) (*MuxOutput, error) {

filterAst, program := liftFilter(program)
input, err := inputProc(reader, filterAst, span)
scanner, err := newScanner(reader, filterAst, span)
if err != nil {
return nil, err
}
Expand All @@ -36,11 +36,11 @@ func CompileWarningsCh(ctx context.Context, program ast.Proc, reader zbuf.Reader
Reverse: reverse,
Warnings: ch,
}
leaves, err := proc.CompileProc(nil, program, pctx, input)
leaves, err := proc.CompileProc(nil, program, pctx, scanner)
if err != nil {
return nil, err
}
return NewMuxOutput(pctx, leaves), nil
return NewMuxOutput(pctx, leaves, scanner), nil
}

// liftFilter removes the filter at the head of the flowgraph AST, if
Expand All @@ -67,10 +67,10 @@ func liftFilter(p ast.Proc) (*ast.FilterProc, ast.Proc) {
return nil, p
}

// inputProc takes a Reader, optional Filter AST, and timespan, and
// constructs an input proc that can be used as the head of a
// newScanner takes a Reader, optional Filter AST, and timespan, and
// constructs a scanner that can be used as the head of a
// flowgraph.
func inputProc(reader zbuf.Reader, fltast *ast.FilterProc, span nano.Span) (proc.Proc, error) {
func newScanner(reader zbuf.Reader, fltast *ast.FilterProc, span nano.Span) (*scanner.Scanner, error) {
var f filter.Filter
if fltast != nil {
var err error
Expand Down
20 changes: 7 additions & 13 deletions driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,23 +13,18 @@ import (
type Driver interface {
Warn(msg string) error
Write(channelID int, batch zbuf.Batch) error
ChannelEnd(channelID int, stats api.ScannerStats) error
ChannelEnd(channelID int) error
Stats(api.ScannerStats) error
}

func Run(out *MuxOutput, d Driver, statsTickCh <-chan time.Time) error {
//stats are zero at this point.
var stats api.ScannerStats
for !out.Complete() {
chunk := out.Pull(statsTickCh)
if chunk.Err != nil {
if chunk.Err == ErrTimeout {
/* not yet
err := d.sendStats(out.Stats())
if err != nil {
return d.abort(0, err)
if err := d.Stats(out.Stats()); err != nil {
return err
}
*/
continue
}
if chunk.Err == context.Canceled {
Expand All @@ -43,9 +38,8 @@ func Run(out *MuxOutput, d Driver, statsTickCh <-chan time.Time) error {
}
}
if chunk.Batch == nil {
// One of the flowgraph tails is done. We send stats and
// a done message for each channel that finishes
if err := d.ChannelEnd(chunk.ID, stats); err != nil {
// One of the flowgraph tails is done.
if err := d.ChannelEnd(chunk.ID); err != nil {
return err
}
} else {
Expand Down Expand Up @@ -94,5 +88,5 @@ func (d *CLI) Warn(msg string) error {
return nil
}

func (d *CLI) ChannelEnd(int, api.ScannerStats) error { return nil }
func (d *CLI) Stats(api.ScannerStats) error { return nil }
func (d *CLI) ChannelEnd(int) error { return nil }
func (d *CLI) Stats(api.ScannerStats) error { return nil }
14 changes: 12 additions & 2 deletions driver/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ import (
"time"

"github.com/brimsec/zq/proc"
"github.com/brimsec/zq/scanner"
"github.com/brimsec/zq/zbuf"
"github.com/brimsec/zq/zqd/api"
)

type MuxResult struct {
Expand All @@ -22,6 +24,7 @@ type MuxOutput struct {
muxProcs []*Mux
once sync.Once
in chan MuxResult
scanner *scanner.Scanner
}

type Mux struct {
Expand Down Expand Up @@ -63,16 +66,23 @@ func (m *Mux) run() {
}
}

func NewMuxOutput(ctx *proc.Context, parents []proc.Proc) *MuxOutput {
func NewMuxOutput(ctx *proc.Context, parents []proc.Proc, scanner *scanner.Scanner) *MuxOutput {
n := len(parents)
c := make(chan MuxResult, n)
mux := &MuxOutput{ctx: ctx, runners: n, in: c}
mux := &MuxOutput{ctx: ctx, runners: n, in: c, scanner: scanner}
for id, parent := range parents {
mux.muxProcs = append(mux.muxProcs, newMux(ctx, parent, id, c))
}
return mux
}

func (m *MuxOutput) Stats() api.ScannerStats {
if m.scanner == nil {
return api.ScannerStats{}
}
return m.scanner.Stats()
}

func (m *MuxOutput) Complete() bool {
return len(m.ctx.Warnings) == 0 && m.runners == 0
}
Expand Down
22 changes: 22 additions & 0 deletions scanner/scanner.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,27 @@
package scanner

import (
"sync/atomic"

"github.com/brimsec/zq/filter"
"github.com/brimsec/zq/pkg/nano"
"github.com/brimsec/zq/proc"
"github.com/brimsec/zq/zbuf"
"github.com/brimsec/zq/zng"
"github.com/brimsec/zq/zqd/api"
)

// Scanner implements the proc.Proc interface.
type Scanner struct {
reader zbuf.Reader
filter filter.Filter
span nano.Span
stats struct {
bytesRead int64
bytesMatched int64
recordsRead int64
recordsMatched int64
}
}

func NewScanner(reader zbuf.Reader, f filter.Filter, s nano.Span) *Scanner {
Expand All @@ -30,17 +39,30 @@ func (s *Scanner) Pull() (zbuf.Batch, error) {
return zbuf.ReadBatch(s, batchSize)
}

func (s *Scanner) Stats() api.ScannerStats {
return api.ScannerStats{
BytesRead: atomic.LoadInt64(&s.stats.bytesRead),
BytesMatched: atomic.LoadInt64(&s.stats.bytesMatched),
RecordsRead: atomic.LoadInt64(&s.stats.recordsRead),
RecordsMatched: atomic.LoadInt64(&s.stats.recordsMatched),
}
}

// Read implements zbuf.Reader.Read.
func (s *Scanner) Read() (*zng.Record, error) {
for {
rec, err := s.reader.Read()
if err != nil || rec == nil {
return nil, err
}
atomic.AddInt64(&s.stats.bytesRead, int64(len(rec.Raw)))
atomic.AddInt64(&s.stats.recordsRead, 1)
if s.span != nano.MaxSpan && !s.span.Contains(rec.Ts) ||
s.filter != nil && !s.filter(rec) {
continue
}
atomic.AddInt64(&s.stats.bytesMatched, int64(len(rec.Raw)))
atomic.AddInt64(&s.stats.recordsMatched, 1)
// Copy the underlying buffer (if volatile) because next call to
// reader.Next() may overwrite said buffer.
rec.CopyBody()
Expand Down
10 changes: 4 additions & 6 deletions zqd/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,10 @@ type SearchStats struct {
}

type ScannerStats struct {
CurrentTs nano.Ts `json:"current_ts"`
BytesRead int64 `json:"bytes_read"`
BytesMatched int64 `json:"bytes_matched"`
RecordsRead int64 `json:"records_read"`
RecordsMatched int64 `json:"records_matched"`
RecordsReceived int64 `json:"records_received"`
BytesRead int64 `json:"bytes_read"`
BytesMatched int64 `json:"bytes_matched"`
RecordsRead int64 `json:"records_read"`
RecordsMatched int64 `json:"records_matched"`
}

type SpaceInfo struct {
Expand Down
55 changes: 48 additions & 7 deletions zqd/handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,39 @@ func TestSearch(t *testing.T) {
sp, err := client.SpacePost(context.Background(), api.SpacePostRequest{Name: "test"})
require.NoError(t, err)
_ = postSpaceLogs(t, client, sp.Name, nil, false, src)
res := zngSearch(t, client, sp.Name, "*")
res := searchTzng(t, client, sp.Name, "*")
require.Equal(t, test.Trim(src), res)
}

func TestSearchStats(t *testing.T) {
src := `
#0:record[_path:string,ts:time]
0:[a;1;]
0:[b;1;]
`
_, client, done := newCore(t)
defer done()
sp, err := client.SpacePost(context.Background(), api.SpacePostRequest{Name: "test"})
require.NoError(t, err)
_ = postSpaceLogs(t, client, sp.Name, nil, false, src)
_, msgs := search(t, client, sp.Name, "_path != b")
var stats *api.SearchStats
for i := len(msgs) - 1; i >= 0; i-- {
if s, ok := msgs[i].(*api.SearchStats); ok {
stats = s
break
}
}
require.NotNil(t, stats)
assert.Equal(t, stats.Type, "SearchStats")
assert.Equal(t, stats.ScannerStats, api.ScannerStats{
BytesRead: 14,
BytesMatched: 7,
RecordsRead: 2,
RecordsMatched: 1,
})
}

func TestGroupByReverse(t *testing.T) {
src := `
#0:record[_path:string,ts:time,uid:bstring]
Expand All @@ -63,7 +92,7 @@ func TestGroupByReverse(t *testing.T) {
sp, err := client.SpacePost(context.Background(), api.SpacePostRequest{Name: "test"})
require.NoError(t, err)
_ = postSpaceLogs(t, client, sp.Name, nil, false, src)
res := zngSearch(t, client, sp.Name, "every 1s count()")
res := searchTzng(t, client, sp.Name, "every 1s count()")
require.Equal(t, test.Trim(counts), res)
}

Expand All @@ -74,7 +103,7 @@ func TestSearchEmptySpace(t *testing.T) {
defer done()
_, err := client.SpacePost(ctx, api.SpacePostRequest{Name: space})
require.NoError(t, err)
res := zngSearch(t, client, space, "*")
res := searchTzng(t, client, space, "*")
require.Equal(t, "", res)
}

Expand Down Expand Up @@ -334,7 +363,7 @@ func TestPostZngLogs(t *testing.T) {
assert.Equal(t, taskend.Type, "TaskEnd")
assert.Nil(t, taskend.Error)

res := zngSearch(t, client, spaceName, "*")
res := searchTzng(t, client, spaceName, "*")
require.Equal(t, strings.Join(append(src2, src1[1]), "\n"), strings.TrimSpace(res))

info, err := client.SpaceInfo(context.Background(), spaceName)
Expand Down Expand Up @@ -421,7 +450,7 @@ func TestPostNDJSONLogs(t *testing.T) {
assert.Equal(t, last.Type, "TaskEnd")
assert.Nil(t, last.Error)

res := zngSearch(t, client, spaceName, "*")
res := searchTzng(t, client, spaceName, "*")
require.Equal(t, expected, strings.TrimSpace(res))

span := nano.Span{Ts: 1e9, Dur: 1e9 + 1}
Expand Down Expand Up @@ -570,7 +599,10 @@ func TestDeleteDuringPacketPost(t *testing.T) {
require.Error(t, <-packetPostErr, "context canceled")
}

func zngSearch(t *testing.T, client *api.Connection, space, prog string) string {
// search runs the provided zql program as a search on the provided
// space, returning the tzng results along with a slice of all control
// messages that were received.
func search(t *testing.T, client *api.Connection, space, prog string) (string, []interface{}) {
parsed, err := zql.ParseProc(prog)
require.NoError(t, err)
proc, err := json.Marshal(parsed)
Expand All @@ -585,8 +617,17 @@ func zngSearch(t *testing.T, client *api.Connection, space, prog string) string
require.NoError(t, err)
buf := bytes.NewBuffer(nil)
w := zbuf.NopFlusher(tzngio.NewWriter(buf))
var msgs []interface{}
r.SetOnCtrl(func(i interface{}) {
msgs = append(msgs, i)
})
require.NoError(t, zbuf.Copy(w, r))
return buf.String()
return buf.String(), msgs
}

func searchTzng(t *testing.T, client *api.Connection, space, prog string) string {
tzng, _ := search(t, client, space, prog)
return tzng
}

func createTempDir(t *testing.T) string {
Expand Down
2 changes: 1 addition & 1 deletion zqd/handlers_zeek_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func TestPacketPostSuccess(t *testing.T) {
0:[1501770877.501001;]
0:[1501770877.471635;]
0:[1501770877.471635;]`
res := zngSearch(t, p.client, p.space, "cut ts")
res := searchTzng(t, p.client, p.space, "cut ts")
assert.Equal(t, test.Trim(expected), res)
})
t.Run("SpaceInfo", func(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions zqd/ingest/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,6 @@ func (d *logdriver) Stats(stats api.ScannerStats) error {
return nil
}

func (d *logdriver) ChannelEnd(cid int, stats api.ScannerStats) error {
return d.Stats(stats)
func (d *logdriver) ChannelEnd(cid int) error {
return nil
}
9 changes: 5 additions & 4 deletions zqd/search/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ func (s *Search) Run(output Output) error {
d.abort(0, err)
return err
}
if err := d.Stats(s.mux.Stats()); err != nil {
d.abort(0, err)
return err
}
return d.end(0)
}

Expand Down Expand Up @@ -146,10 +150,7 @@ func (d *searchdriver) Stats(stats api.ScannerStats) error {
return d.output.SendControl(v)
}

func (d *searchdriver) ChannelEnd(cid int, stats api.ScannerStats) error {
if err := d.Stats(stats); err != nil {
return err
}
func (d *searchdriver) ChannelEnd(cid int) error {
v := &api.SearchEnd{
Type: "SearchEnd",
ChannelID: cid,
Expand Down

0 comments on commit e40934a

Please sign in to comment.