Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a vector engine #4925

Merged
merged 8 commits into from
Dec 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 81 additions & 0 deletions cmd/zed/dev/vcache/agg/command.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package agg

import (
"errors"
"flag"

"github.com/brimdata/zed/cli/outputflags"
devvcache "github.com/brimdata/zed/cmd/zed/dev/vcache"
"github.com/brimdata/zed/cmd/zed/root"
"github.com/brimdata/zed/pkg/charm"
"github.com/brimdata/zed/pkg/storage"
"github.com/brimdata/zed/runtime/vam"
"github.com/brimdata/zed/runtime/vcache"
"github.com/brimdata/zed/zbuf"
"github.com/brimdata/zed/zio"
"github.com/segmentio/ksuid"
)

var Agg = &charm.Spec{
Name: "agg",
Usage: "agg [flags] field[,field...] path",
Short: "read a VNG file and run an aggregate as a test",
Long: `
The project command reads VNG vectors from
a VNG storage objects (local files or s3 objects) and outputs
the reconstructed ZNG row data as an aggregate function.

This command is most useful for testing the VNG vector cache.
`,
New: newCommand,
}

func init() {
devvcache.Cmd.Add(Agg)
}

type Command struct {
*root.Command
outputFlags outputflags.Flags
}

func newCommand(parent charm.Command, f *flag.FlagSet) (charm.Command, error) {
c := &Command{Command: parent.(*root.Command)}
c.outputFlags.SetFlags(f)
return c, nil
}

func (c *Command) Run(args []string) error {
ctx, cleanup, err := c.Init(&c.outputFlags)
if err != nil {
return err
}
defer cleanup()
if len(args) != 2 {
//XXX
return errors.New("VNG read: must be run with a single path argument followed by one or more fields")
}
uri, err := storage.ParseURI(args[0])
if err != nil {
return err
}
field := args[1]
local := storage.NewLocalEngine()
cache := vcache.NewCache(local)
object, err := cache.Fetch(ctx, uri, ksuid.Nil)
if err != nil {
return err
}
defer object.Close()
//XXX nil puller
agg := vam.NewCountByString(object.LocalContext(), nil, field)
writer, err := c.outputFlags.Open(ctx, local)
if err != nil {
return err
}
if err := zio.Copy(writer, zbuf.PullerReader(agg)); err != nil {
writer.Close()
return err
}
return writer.Close()
}
11 changes: 6 additions & 5 deletions cmd/zed/dev/vcache/copy/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/brimdata/zed/pkg/charm"
"github.com/brimdata/zed/pkg/storage"
"github.com/brimdata/zed/runtime/vcache"
"github.com/brimdata/zed/zio"
"github.com/segmentio/ksuid"
)

Expand Down Expand Up @@ -67,9 +66,11 @@ func (c *Command) Run(args []string) error {
if err != nil {
return err
}
if err := zio.Copy(writer, object.NewReader()); err != nil {
writer.Close()
return err
}
/*
if err := zio.Copy(writer, object.NewReader()); err != nil {
writer.Close()
return err
}
*/
return writer.Close()
}
3 changes: 2 additions & 1 deletion cmd/zed/dev/vcache/project/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/brimdata/zed/cmd/zed/root"
"github.com/brimdata/zed/pkg/charm"
"github.com/brimdata/zed/pkg/storage"
"github.com/brimdata/zed/runtime/vam"
"github.com/brimdata/zed/runtime/vcache"
"github.com/brimdata/zed/zio"
"github.com/segmentio/ksuid"
Expand Down Expand Up @@ -64,7 +65,7 @@ func (c *Command) Run(args []string) error {
return err
}
defer object.Close()
projection, err := object.NewProjection(fields)
projection, err := vam.NewProjection(object, fields)
if err != nil {
return err
}
Expand Down
1 change: 1 addition & 0 deletions cmd/zed/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
_ "github.com/brimdata/zed/cmd/zed/dev/indexfile"
_ "github.com/brimdata/zed/cmd/zed/dev/indexfile/create"
_ "github.com/brimdata/zed/cmd/zed/dev/indexfile/lookup"
_ "github.com/brimdata/zed/cmd/zed/dev/vcache/agg"
_ "github.com/brimdata/zed/cmd/zed/dev/vcache/copy"
_ "github.com/brimdata/zed/cmd/zed/dev/vcache/project"
"github.com/brimdata/zed/cmd/zed/drop"
Expand Down
16 changes: 12 additions & 4 deletions compiler/ast/dag/op.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,11 @@ type (
Kind string `json:"kind" unpack:""`
Cflag bool `json:"cflag"`
}
// Vectorize executes its body using the vector engine.
Vectorize struct {
Kind string `json:"kind" unpack:""`
Body Seq `json:"body"`
}
Yield struct {
Kind string `json:"kind" unpack:""`
Exprs []Expr `json:"exprs"`
Expand All @@ -165,10 +170,12 @@ type (
Kind string `json:"kind" unpack:""`
}
SeqScan struct {
Kind string `json:"kind" unpack:""`
Pool ksuid.KSUID `json:"pool"`
Filter Expr `json:"filter"`
KeyPruner Expr `json:"key_pruner"`
Kind string `json:"kind" unpack:""`
Pool ksuid.KSUID `json:"pool"`
Commit ksuid.KSUID `json:"commit"`
Fields []field.Path `json:"fields"`
Filter Expr `json:"filter"`
KeyPruner Expr `json:"key_pruner"`
}
Deleter struct {
Kind string `json:"kind" unpack:""`
Expand Down Expand Up @@ -296,6 +303,7 @@ func (*Join) OpNode() {}
func (*Shape) OpNode() {}
func (*Explode) OpNode() {}
func (*Over) OpNode() {}
func (*Vectorize) OpNode() {}
func (*Yield) OpNode() {}
func (*Merge) OpNode() {}
func (*Combine) OpNode() {}
Expand Down
1 change: 1 addition & 0 deletions compiler/ast/dag/unpack.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ var unpacker = unpack.New(
UnaryExpr{},
Uniq{},
Var{},
Vectorize{},
VectorValue{},
Yield{},
)
Expand Down
4 changes: 4 additions & 0 deletions compiler/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ func (j *Job) OptimizeDeleter(replicas int) error {
func (j *Job) Parallelize(n int) error {
var err error
j.entry, err = j.optimizer.Parallelize(j.entry, n)
if err != nil {
return err
}
j.entry, err = j.optimizer.Vectorize(j.entry)
return err
}

Expand Down
31 changes: 31 additions & 0 deletions compiler/kernel/op.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/brimdata/zed"
"github.com/brimdata/zed/compiler/ast/dag"
"github.com/brimdata/zed/compiler/data"
"github.com/brimdata/zed/compiler/optimizer"
"github.com/brimdata/zed/compiler/optimizer/demand"
"github.com/brimdata/zed/lake"
"github.com/brimdata/zed/order"
Expand All @@ -36,6 +37,7 @@ import (
"github.com/brimdata/zed/runtime/op/traverse"
"github.com/brimdata/zed/runtime/op/uniq"
"github.com/brimdata/zed/runtime/op/yield"
"github.com/brimdata/zed/runtime/vam"
"github.com/brimdata/zed/zbuf"
"github.com/brimdata/zed/zio"
"github.com/brimdata/zed/zson"
Expand Down Expand Up @@ -299,6 +301,8 @@ func (b *Builder) compileLeaf(o dag.Op, parent zbuf.Puller) (zbuf.Puller, error)
return meta.NewDeleter(b.octx, parent, pool, filter, pruner, b.progress, b.deletes), nil
case *dag.Load:
return load.New(b.octx, b.source.Lake(), parent, v.Pool, v.Branch, v.Author, v.Message, v.Meta), nil
case *dag.Vectorize:
return b.compileVectorize(v.Body, parent)
default:
return nil, fmt.Errorf("unknown DAG operator type: %v", v)
}
Expand Down Expand Up @@ -665,3 +669,30 @@ func isEntry(seq dag.Seq) bool {
}
return false
}

func (b *Builder) compileVectorize(seq dag.Seq, parent zbuf.Puller) (zbuf.Puller, error) {
vamParent := vam.NewDematerializer(parent)
for _, o := range seq {
switch o := o.(type) {
case *dag.SeqScan:
pool, err := b.lookupPool(o.Pool)
if err != nil {
return nil, err
}
//XXX check VectorCache not nil
vamParent = vam.NewVecScanner(b.octx, b.source.Lake().VectorCache(), vamParent.(zbuf.Puller), pool, o.Fields, nil, nil)
case *dag.Summarize:
if name, ok := optimizer.IsCountByString(o); ok {
vamParent = vam.NewCountByString(b.octx.Zctx, vamParent, name)
} else if name, ok := optimizer.IsSum(o); ok {
vamParent = vam.NewSum(b.octx.Zctx, vamParent, name)
} else {
return nil, fmt.Errorf("internal error: unhandled dag.Summarize: %#v", o)
}

default:
return nil, fmt.Errorf("internal error: unknown dag.Op: %#v", o)
}
}
return vam.NewMaterializer(vamParent), nil
}
5 changes: 3 additions & 2 deletions compiler/optimizer/demand.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ func insertDemand(seq dag.Seq) dag.Seq {
demands := InferDemandSeqOut(seq)
return walk(seq, true, func(seq dag.Seq) dag.Seq {
for _, op := range seq {
// TODO
_, _ = demands, op
if s, ok := op.(*dag.SeqScan); ok {
s.Fields = demand.Fields(demands[op])
}
}
return seq
})
Expand Down
20 changes: 20 additions & 0 deletions compiler/optimizer/demand/demand.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package demand

import "github.com/brimdata/zed/pkg/field"

type Demand interface {
isDemand()
}
Expand Down Expand Up @@ -98,3 +100,21 @@ func GetKey(demand Demand, key string) Demand {
panic("Unreachable")
}
}

func Fields(d Demand) []field.Path {
keys, ok := d.(keys)
if !ok {
return nil
}
var fields []field.Path
for k, v := range keys {
if fs := Fields(v); len(fs) > 0 {
for _, f := range fs {
fields = append(fields, append(field.Path{k}, f...))
}
} else {
fields = append(fields, field.Path{k})
}
}
return fields
}
1 change: 1 addition & 0 deletions compiler/optimizer/optimizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ func (o *Optimizer) optimizeSourcePaths(seq dag.Seq) (dag.Seq, error) {
seq = append(seq, &dag.SeqScan{
Kind: "SeqScan",
Pool: op.ID,
Commit: op.Commit,
Filter: filter,
KeyPruner: lister.KeyPruner,
})
Expand Down
Loading