Skip to content

Commit

Permalink
Vectorize only if all data objects have vectors
Browse files Browse the repository at this point in the history
  • Loading branch information
nwt committed Dec 5, 2023
1 parent f2e061f commit 46d5a58
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 16 deletions.
1 change: 1 addition & 0 deletions compiler/ast/dag/op.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ type (
SeqScan struct {
Kind string `json:"kind" unpack:""`
Pool ksuid.KSUID `json:"pool"`
Commit ksuid.KSUID `json:"commit"`
Fields []field.Path `json:"fields"`
Filter Expr `json:"filter"`
KeyPruner Expr `json:"key_pruner"`
Expand Down
2 changes: 1 addition & 1 deletion compiler/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func (j *Job) Parallelize(n int) error {
if err != nil {
return err
}
j.entry = j.optimizer.Vectorize(j.entry)
j.entry, err = j.optimizer.Vectorize(j.entry)
return err
}

Expand Down
1 change: 1 addition & 0 deletions compiler/optimizer/optimizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ func (o *Optimizer) optimizeSourcePaths(seq dag.Seq) (dag.Seq, error) {
seq = append(seq, &dag.SeqScan{
Kind: "SeqScan",
Pool: op.ID,
Commit: op.Commit,
Filter: filter,
KeyPruner: lister.KeyPruner,
})
Expand Down
56 changes: 41 additions & 15 deletions compiler/optimizer/vam.go
Original file line number Diff line number Diff line change
@@ -1,24 +1,55 @@
package optimizer

import (
"context"

"github.com/brimdata/zed/compiler/ast/dag"
"github.com/brimdata/zed/pkg/field"
)

func (o *Optimizer) Vectorize(seq dag.Seq) dag.Seq {
return walk(seq, true, func(seq dag.Seq) dag.Seq {
if len(seq) >= 2 && isScan(seq[0]) {
if _, ok := IsCountByString(seq[1]); ok {
return vectorize(seq, 2)
}
if _, ok := IsSum(seq[1]); ok {
return vectorize(seq, 2)
}
func (o *Optimizer) Vectorize(seq dag.Seq) (dag.Seq, error) {
return walkEntries(seq, func(seq dag.Seq) (dag.Seq, error) {
if len(seq) < 2 {
return seq, nil
}
if ok, err := o.isScanWithVectors(seq[0]); !ok || err != nil {
return seq, err
}
if _, ok := IsCountByString(seq[1]); ok {
return vectorize(seq, 2), nil
}
if _, ok := IsSum(seq[1]); ok {
return vectorize(seq, 2), nil
}
return seq
return seq, nil
})
}

func (o *Optimizer) isScanWithVectors(op dag.Op) (bool, error) {
scan, ok := op.(*dag.SeqScan)
if !ok {
return false, nil
}
pool, err := o.lookupPool(scan.Pool)
if err != nil {
return false, err
}
snap, err := pool.Snapshot(context.TODO(), scan.Commit)
if err != nil {
return false, err
}
objects := snap.SelectAll()
if len(objects) == 0 {
return false, nil
}
for _, obj := range objects {
if !snap.HasVector(obj.ID) {
return false, nil
}
}
return true, nil
}

func vectorize(seq dag.Seq, n int) dag.Seq {
return append(dag.Seq{
&dag.Vectorize{
Expand All @@ -28,11 +59,6 @@ func vectorize(seq dag.Seq, n int) dag.Seq {
}, seq[n:]...)
}

func isScan(o dag.Op) bool {
_, ok := o.(*dag.SeqScan)
return ok
}

// IsCountByString returns whether o represents "count() by <top-level field>"
// along with the field name.
func IsCountByString(o dag.Op) (string, bool) {
Expand Down

0 comments on commit 46d5a58

Please sign in to comment.