Skip to content

Commit

Permalink
Add query describe endpoint
Browse files Browse the repository at this point in the history
This commit adds the query describe endpoint- a service endpoint the
returns information about a posted query. This endpoint is meant for
internal use from within ZUI and is not meant for public consumption.
  • Loading branch information
mattnibs committed May 17, 2024
1 parent dd2fa87 commit 97c3c56
Show file tree
Hide file tree
Showing 6 changed files with 271 additions and 0 deletions.
29 changes: 29 additions & 0 deletions compiler/describe.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package compiler

import (
"context"
"errors"

"github.com/brimdata/zed"
"github.com/brimdata/zed/compiler/data"
"github.com/brimdata/zed/compiler/describe"
"github.com/brimdata/zed/compiler/semantic"
"github.com/brimdata/zed/lakeparse"
"github.com/brimdata/zed/runtime"
)

func Describe(query string, src *data.Source, head *lakeparse.Commitish) (*describe.Info, error) {
seq, err := Parse(query)
if err != nil {
return nil, err
}
if len(seq) == 0 {
return nil, errors.New("internal error: AST seq cannot be empty")
}
rctx := runtime.NewContext(context.Background(), zed.NewContext())
entry, err := semantic.AnalyzeAddSource(rctx.Context, seq, src, head)
if err != nil {
return nil, err
}
return describe.Analyze(rctx.Context, src, entry)
}
145 changes: 145 additions & 0 deletions compiler/describe/analyze.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
package describe

import (
"context"
"errors"
"fmt"

"github.com/brimdata/zed/compiler/ast/dag"
"github.com/brimdata/zed/compiler/data"
"github.com/brimdata/zed/compiler/optimizer"
"github.com/brimdata/zed/lake"
"github.com/brimdata/zed/order"
"github.com/brimdata/zed/pkg/field"
"github.com/segmentio/ksuid"
)

type Info struct {
Sources []Source `json:"sources"`
Aggregations []*Aggregation `json:"aggregations"`
SortKeys []order.SortKey `json:"sort_keys"`
}

type Source interface {
Source()
}

type (
LakeMeta struct {
Kind string `json:"kind" unpack:""`
Meta string `json:"meta"`
}
Pool struct {
Kind string `json:"kind" unpack:""`
Name string `json:"name"`
ID ksuid.KSUID `json:"id"`
}
Path struct {
Kind string `json:"kind" unpack:""`
URI string `json:"uri"`
}
)

func (*LakeMeta) Source() {}
func (*Pool) Source() {}
func (*Path) Source() {}

type Aggregation struct {
Keys field.List `json:"keys"`
}

func Analyze(ctx context.Context, source *data.Source, seq dag.Seq) (*Info, error) {
var info Info
var err error
if info.Sources, err = describeSources(ctx, source.Lake(), seq[0]); err != nil {
return nil, err
}
if info.SortKeys, err = optimizer.New(ctx, source).SortKeys(seq); err != nil {
return nil, err
}
info.Aggregations = describeAggs(seq, []*Aggregation{nil})
return &info, nil
}

func describeSources(ctx context.Context, lk *lake.Root, o dag.Op) ([]Source, error) {
switch o := o.(type) {
case *dag.Fork:
var s []Source
for _, p := range o.Paths {
out, err := describeSources(ctx, lk, p[0])
if err != nil {
return nil, err
}
s = append(s, out...)
}
return s, nil
case *dag.DefaultScan:
return []Source{&Path{Kind: "Path", URI: "stdio://stdin"}}, nil
case *dag.FileScan:
return []Source{&Path{Kind: "Path", URI: o.Path}}, nil
case *dag.HTTPScan:
return []Source{&Path{Kind: "Path", URI: o.URL}}, nil
case *dag.PoolScan:
return sourceOfPool(ctx, lk, o.ID)
case *dag.Lister:
return sourceOfPool(ctx, lk, o.Pool)
case *dag.SeqScan:
return sourceOfPool(ctx, lk, o.Pool)
case *dag.CommitMetaScan:
return sourceOfPool(ctx, lk, o.Pool)
case *dag.LakeMetaScan:
return []Source{&LakeMeta{Kind: "LakeMeta", Meta: o.Meta}}, nil
default:
return nil, fmt.Errorf("unsupported source type %T", o)
}
}

func sourceOfPool(ctx context.Context, lk *lake.Root, id ksuid.KSUID) ([]Source, error) {
if lk == nil {
panic(errors.New("internal error: lake operation cannot be used in non-lake context"))
}
p, err := lk.OpenPool(ctx, id)
if err != nil {
return nil, err
}
return []Source{&Pool{
Kind: "Pool",
ID: id,
Name: p.Name,
}}, nil
}

func describeAggs(seq dag.Seq, parents []*Aggregation) []*Aggregation {
for _, op := range seq {
parents = describeOpAggs(op, parents)
}
return parents
}

func describeOpAggs(op dag.Op, parents []*Aggregation) []*Aggregation {
switch op := op.(type) {
case *dag.Fork:
var aggs []*Aggregation
for _, p := range op.Paths {
aggs = append(aggs, describeAggs(p, []*Aggregation{nil})...)
}
return aggs
case *dag.Scatter:
var aggs []*Aggregation
for _, p := range op.Paths {
aggs = append(aggs, describeAggs(p, []*Aggregation{nil})...)
}
return aggs
case *dag.Summarize:
agg := new(Aggregation)
for _, k := range op.Keys {
agg.Keys = append(agg.Keys, k.LHS.(*dag.This).Path)
}
return []*Aggregation{agg}
}
// If more than one parent reset to nil aggregation.
if len(parents) > 1 {
return []*Aggregation{nil}
}
return parents
}
4 changes: 4 additions & 0 deletions compiler/optimizer/optimizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,10 @@ func (o *Optimizer) OptimizeDeleter(seq dag.Seq, replicas int) (dag.Seq, error)
return dag.Seq{lister, scatter, merge}, nil
}

func (o *Optimizer) SortKeys(seq dag.Seq) ([]order.SortKey, error) {
return o.propagateSortKey(copyOps(seq), []order.SortKey{order.Nil})
}

func (o *Optimizer) optimizeSourcePaths(seq dag.Seq) (dag.Seq, error) {
return walkEntries(seq, func(seq dag.Seq) (dag.Seq, error) {
if len(seq) == 0 {
Expand Down
1 change: 1 addition & 0 deletions service/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ func (c *Core) addAPIServerRoutes() {
c.authhandle("/pool/{pool}/revision/{revision}/vector", handleVectorDelete).Methods("DELETE")
c.authhandle("/pool/{pool}/stats", handlePoolStats).Methods("GET")
c.authhandle("/query", handleQuery).Methods("OPTIONS", "POST")
c.authhandle("/query/describe", handleQueryDescribe).Methods("OPTIONS", "POST")
c.authhandle("/query/status/{requestID}", handleQueryStatus).Methods("GET")
}

Expand Down
16 changes: 16 additions & 0 deletions service/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@ import (
"github.com/brimdata/zed/api/queryio"
"github.com/brimdata/zed/compiler"
"github.com/brimdata/zed/compiler/ast"
"github.com/brimdata/zed/compiler/data"
"github.com/brimdata/zed/compiler/optimizer/demand"
"github.com/brimdata/zed/lake"
lakeapi "github.com/brimdata/zed/lake/api"
"github.com/brimdata/zed/lake/commits"
"github.com/brimdata/zed/lake/journal"
"github.com/brimdata/zed/lakeparse"
"github.com/brimdata/zed/pkg/storage"
"github.com/brimdata/zed/runtime"
"github.com/brimdata/zed/runtime/exec"
"github.com/brimdata/zed/runtime/sam/op"
Expand Down Expand Up @@ -170,6 +172,20 @@ func handleCompile(c *Core, w *ResponseWriter, r *Request) {
w.Respond(http.StatusOK, ast)
}

func handleQueryDescribe(c *Core, w *ResponseWriter, r *Request) {
var req api.QueryRequest
if !r.Unmarshal(w, &req) {
return
}
src := data.NewSource(storage.NewRemoteEngine(), c.root)
info, err := compiler.Describe(req.Query, src, &req.Head)
if err != nil {
w.Error(srverr.ErrInvalid(err))
return
}
w.Respond(http.StatusOK, info)
}

func handleBranchGet(c *Core, w *ResponseWriter, r *Request) {
branchName, ok := r.StringFromPath(w, "branch")
if !ok {
Expand Down
76 changes: 76 additions & 0 deletions service/ztests/query-describe.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
script: |
source service.sh
zed create -q test1
zed create -q test2
for file in multifrom.zed agg.zed; do
echo // === $file ===
query="$(cat $file | jq -Rsa .)"
curl -H "Accept: application/json" -d "{\"query\":$query,\"head\":{\"pool\":\"test1\"}}" $ZED_LAKE/query/describe |
zq -J 'sources := (over sources | id := "XXX")' -
done
inputs:
- name: service.sh
- name: multifrom.zed
data: |
from (
pool test1
pool test2
) | put foo := "bar"
- name: agg.zed
data: |
count() by key1:=v1, key2
outputs:
- name: stdout
data: |
// === multifrom.zed ===
{
"sources": [
{
"kind": "Pool",
"name": "test1",
"id": "XXX"
},
{
"kind": "Pool",
"name": "test2",
"id": "XXX"
}
],
"aggregations": [
null
],
"sort_keys": [
{
"order": "desc",
"keys": [
[
"ts"
]
]
}
]
}
// === agg.zed ===
{
"sources": {
"kind": "Pool",
"name": "test1",
"id": "XXX"
},
"aggregations": [
{
"keys": [
[
"key1"
],
[
"key2"
]
]
}
],
"sort_keys": null
}

0 comments on commit 97c3c56

Please sign in to comment.