From 97c3c560a8301792c69a8bb69ee3a0d4ffcace82 Mon Sep 17 00:00:00 2001 From: Matthew Nibecker Date: Fri, 17 May 2024 15:34:15 -0700 Subject: [PATCH] Add query describe endpoint This commit adds the query describe endpoint- a service endpoint the returns information about a posted query. This endpoint is meant for internal use from within ZUI and is not meant for public consumption. --- compiler/describe.go | 29 ++++++ compiler/describe/analyze.go | 145 +++++++++++++++++++++++++++++ compiler/optimizer/optimizer.go | 4 + service/core.go | 1 + service/handlers.go | 16 ++++ service/ztests/query-describe.yaml | 76 +++++++++++++++ 6 files changed, 271 insertions(+) create mode 100644 compiler/describe.go create mode 100644 compiler/describe/analyze.go create mode 100644 service/ztests/query-describe.yaml diff --git a/compiler/describe.go b/compiler/describe.go new file mode 100644 index 0000000000..f664547270 --- /dev/null +++ b/compiler/describe.go @@ -0,0 +1,29 @@ +package compiler + +import ( + "context" + "errors" + + "github.com/brimdata/zed" + "github.com/brimdata/zed/compiler/data" + "github.com/brimdata/zed/compiler/describe" + "github.com/brimdata/zed/compiler/semantic" + "github.com/brimdata/zed/lakeparse" + "github.com/brimdata/zed/runtime" +) + +func Describe(query string, src *data.Source, head *lakeparse.Commitish) (*describe.Info, error) { + seq, err := Parse(query) + if err != nil { + return nil, err + } + if len(seq) == 0 { + return nil, errors.New("internal error: AST seq cannot be empty") + } + rctx := runtime.NewContext(context.Background(), zed.NewContext()) + entry, err := semantic.AnalyzeAddSource(rctx.Context, seq, src, head) + if err != nil { + return nil, err + } + return describe.Analyze(rctx.Context, src, entry) +} diff --git a/compiler/describe/analyze.go b/compiler/describe/analyze.go new file mode 100644 index 0000000000..d516d8dbc3 --- /dev/null +++ b/compiler/describe/analyze.go @@ -0,0 +1,145 @@ +package describe + +import ( + "context" + "errors" + "fmt" + + "github.com/brimdata/zed/compiler/ast/dag" + "github.com/brimdata/zed/compiler/data" + "github.com/brimdata/zed/compiler/optimizer" + "github.com/brimdata/zed/lake" + "github.com/brimdata/zed/order" + "github.com/brimdata/zed/pkg/field" + "github.com/segmentio/ksuid" +) + +type Info struct { + Sources []Source `json:"sources"` + Aggregations []*Aggregation `json:"aggregations"` + SortKeys []order.SortKey `json:"sort_keys"` +} + +type Source interface { + Source() +} + +type ( + LakeMeta struct { + Kind string `json:"kind" unpack:""` + Meta string `json:"meta"` + } + Pool struct { + Kind string `json:"kind" unpack:""` + Name string `json:"name"` + ID ksuid.KSUID `json:"id"` + } + Path struct { + Kind string `json:"kind" unpack:""` + URI string `json:"uri"` + } +) + +func (*LakeMeta) Source() {} +func (*Pool) Source() {} +func (*Path) Source() {} + +type Aggregation struct { + Keys field.List `json:"keys"` +} + +func Analyze(ctx context.Context, source *data.Source, seq dag.Seq) (*Info, error) { + var info Info + var err error + if info.Sources, err = describeSources(ctx, source.Lake(), seq[0]); err != nil { + return nil, err + } + if info.SortKeys, err = optimizer.New(ctx, source).SortKeys(seq); err != nil { + return nil, err + } + info.Aggregations = describeAggs(seq, []*Aggregation{nil}) + return &info, nil +} + +func describeSources(ctx context.Context, lk *lake.Root, o dag.Op) ([]Source, error) { + switch o := o.(type) { + case *dag.Fork: + var s []Source + for _, p := range o.Paths { + out, err := describeSources(ctx, lk, p[0]) + if err != nil { + return nil, err + } + s = append(s, out...) + } + return s, nil + case *dag.DefaultScan: + return []Source{&Path{Kind: "Path", URI: "stdio://stdin"}}, nil + case *dag.FileScan: + return []Source{&Path{Kind: "Path", URI: o.Path}}, nil + case *dag.HTTPScan: + return []Source{&Path{Kind: "Path", URI: o.URL}}, nil + case *dag.PoolScan: + return sourceOfPool(ctx, lk, o.ID) + case *dag.Lister: + return sourceOfPool(ctx, lk, o.Pool) + case *dag.SeqScan: + return sourceOfPool(ctx, lk, o.Pool) + case *dag.CommitMetaScan: + return sourceOfPool(ctx, lk, o.Pool) + case *dag.LakeMetaScan: + return []Source{&LakeMeta{Kind: "LakeMeta", Meta: o.Meta}}, nil + default: + return nil, fmt.Errorf("unsupported source type %T", o) + } +} + +func sourceOfPool(ctx context.Context, lk *lake.Root, id ksuid.KSUID) ([]Source, error) { + if lk == nil { + panic(errors.New("internal error: lake operation cannot be used in non-lake context")) + } + p, err := lk.OpenPool(ctx, id) + if err != nil { + return nil, err + } + return []Source{&Pool{ + Kind: "Pool", + ID: id, + Name: p.Name, + }}, nil +} + +func describeAggs(seq dag.Seq, parents []*Aggregation) []*Aggregation { + for _, op := range seq { + parents = describeOpAggs(op, parents) + } + return parents +} + +func describeOpAggs(op dag.Op, parents []*Aggregation) []*Aggregation { + switch op := op.(type) { + case *dag.Fork: + var aggs []*Aggregation + for _, p := range op.Paths { + aggs = append(aggs, describeAggs(p, []*Aggregation{nil})...) + } + return aggs + case *dag.Scatter: + var aggs []*Aggregation + for _, p := range op.Paths { + aggs = append(aggs, describeAggs(p, []*Aggregation{nil})...) + } + return aggs + case *dag.Summarize: + agg := new(Aggregation) + for _, k := range op.Keys { + agg.Keys = append(agg.Keys, k.LHS.(*dag.This).Path) + } + return []*Aggregation{agg} + } + // If more than one parent reset to nil aggregation. + if len(parents) > 1 { + return []*Aggregation{nil} + } + return parents +} diff --git a/compiler/optimizer/optimizer.go b/compiler/optimizer/optimizer.go index 5f6c90389c..45cdd6185b 100644 --- a/compiler/optimizer/optimizer.go +++ b/compiler/optimizer/optimizer.go @@ -185,6 +185,10 @@ func (o *Optimizer) OptimizeDeleter(seq dag.Seq, replicas int) (dag.Seq, error) return dag.Seq{lister, scatter, merge}, nil } +func (o *Optimizer) SortKeys(seq dag.Seq) ([]order.SortKey, error) { + return o.propagateSortKey(copyOps(seq), []order.SortKey{order.Nil}) +} + func (o *Optimizer) optimizeSourcePaths(seq dag.Seq) (dag.Seq, error) { return walkEntries(seq, func(seq dag.Seq) (dag.Seq, error) { if len(seq) == 0 { diff --git a/service/core.go b/service/core.go index 84faf0a0eb..5eb886d602 100644 --- a/service/core.go +++ b/service/core.go @@ -187,6 +187,7 @@ func (c *Core) addAPIServerRoutes() { c.authhandle("/pool/{pool}/revision/{revision}/vector", handleVectorDelete).Methods("DELETE") c.authhandle("/pool/{pool}/stats", handlePoolStats).Methods("GET") c.authhandle("/query", handleQuery).Methods("OPTIONS", "POST") + c.authhandle("/query/describe", handleQueryDescribe).Methods("OPTIONS", "POST") c.authhandle("/query/status/{requestID}", handleQueryStatus).Methods("GET") } diff --git a/service/handlers.go b/service/handlers.go index 42995e7030..88913fc3fc 100644 --- a/service/handlers.go +++ b/service/handlers.go @@ -12,12 +12,14 @@ import ( "github.com/brimdata/zed/api/queryio" "github.com/brimdata/zed/compiler" "github.com/brimdata/zed/compiler/ast" + "github.com/brimdata/zed/compiler/data" "github.com/brimdata/zed/compiler/optimizer/demand" "github.com/brimdata/zed/lake" lakeapi "github.com/brimdata/zed/lake/api" "github.com/brimdata/zed/lake/commits" "github.com/brimdata/zed/lake/journal" "github.com/brimdata/zed/lakeparse" + "github.com/brimdata/zed/pkg/storage" "github.com/brimdata/zed/runtime" "github.com/brimdata/zed/runtime/exec" "github.com/brimdata/zed/runtime/sam/op" @@ -170,6 +172,20 @@ func handleCompile(c *Core, w *ResponseWriter, r *Request) { w.Respond(http.StatusOK, ast) } +func handleQueryDescribe(c *Core, w *ResponseWriter, r *Request) { + var req api.QueryRequest + if !r.Unmarshal(w, &req) { + return + } + src := data.NewSource(storage.NewRemoteEngine(), c.root) + info, err := compiler.Describe(req.Query, src, &req.Head) + if err != nil { + w.Error(srverr.ErrInvalid(err)) + return + } + w.Respond(http.StatusOK, info) +} + func handleBranchGet(c *Core, w *ResponseWriter, r *Request) { branchName, ok := r.StringFromPath(w, "branch") if !ok { diff --git a/service/ztests/query-describe.yaml b/service/ztests/query-describe.yaml new file mode 100644 index 0000000000..390d51e4a8 --- /dev/null +++ b/service/ztests/query-describe.yaml @@ -0,0 +1,76 @@ +script: | + source service.sh + zed create -q test1 + zed create -q test2 + for file in multifrom.zed agg.zed; do + echo // === $file === + query="$(cat $file | jq -Rsa .)" + curl -H "Accept: application/json" -d "{\"query\":$query,\"head\":{\"pool\":\"test1\"}}" $ZED_LAKE/query/describe | + zq -J 'sources := (over sources | id := "XXX")' - + done + + +inputs: + - name: service.sh + - name: multifrom.zed + data: | + from ( + pool test1 + pool test2 + ) | put foo := "bar" + - name: agg.zed + data: | + count() by key1:=v1, key2 + +outputs: + - name: stdout + data: | + // === multifrom.zed === + { + "sources": [ + { + "kind": "Pool", + "name": "test1", + "id": "XXX" + }, + { + "kind": "Pool", + "name": "test2", + "id": "XXX" + } + ], + "aggregations": [ + null + ], + "sort_keys": [ + { + "order": "desc", + "keys": [ + [ + "ts" + ] + ] + } + ] + } + // === agg.zed === + { + "sources": { + "kind": "Pool", + "name": "test1", + "id": "XXX" + }, + "aggregations": [ + { + "keys": [ + [ + "key1" + ], + [ + "key2" + ] + ] + } + ], + "sort_keys": null + }