Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add query describe endpoint
Browse files Browse the repository at this point in the history
This commit adds the query describe endpoint- a service endpoint the
returns information about a posted query. This endpoint is meant for
internal use from within ZUI and is not meant for public consumption.
mattnibs committed May 31, 2024

Verified

This commit was signed with the committer’s verified signature.
StevenKKC Steven Maksym
1 parent 9db943d commit d30342f
Showing 6 changed files with 302 additions and 1 deletion.
26 changes: 26 additions & 0 deletions compiler/describe.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package compiler

import (
"context"
"errors"

"github.com/brimdata/zed/compiler/data"
"github.com/brimdata/zed/compiler/describe"
"github.com/brimdata/zed/compiler/semantic"
"github.com/brimdata/zed/lakeparse"
)

func Describe(ctx context.Context, query string, src *data.Source, head *lakeparse.Commitish) (*describe.Info, error) {
seq, err := Parse(query)
if err != nil {
return nil, err
}
if len(seq) == 0 {
return nil, errors.New("internal error: AST seq cannot be empty")
}
entry, err := semantic.AnalyzeAddSource(ctx, seq, src, head)
if err != nil {
return nil, err
}
return describe.Analyze(ctx, src, entry)
}
160 changes: 160 additions & 0 deletions compiler/describe/analyze.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
package describe

import (
"context"
"errors"
"fmt"

"github.com/brimdata/zed/compiler/ast/dag"
"github.com/brimdata/zed/compiler/data"
"github.com/brimdata/zed/compiler/optimizer"
"github.com/brimdata/zed/lake"
"github.com/brimdata/zed/order"
"github.com/brimdata/zed/pkg/field"
"github.com/segmentio/ksuid"
)

type Info struct {
Sources []Source `json:"sources"`
Channels []Channel `json:"channels"`
}

type Source interface {
Source()
}

type (
LakeMeta struct {
Kind string `json:"kind"`
Meta string `json:"meta"`
}
Pool struct {
Kind string `json:"kind"`
Name string `json:"name"`
ID ksuid.KSUID `json:"id"`
}
Path struct {
Kind string `json:"kind"`
URI string `json:"uri"`
}
)

func (*LakeMeta) Source() {}
func (*Pool) Source() {}
func (*Path) Source() {}

type Channel struct {
AggregationKeys field.List `json:"aggregation_keys"`
Sort *order.SortKey `json:"sort"`
}

func Analyze(ctx context.Context, source *data.Source, seq dag.Seq) (*Info, error) {
var info Info
var err error
if info.Sources, err = describeSources(ctx, source.Lake(), seq[0]); err != nil {
return nil, err
}
sortKeys, err := optimizer.New(ctx, source).SortKeys(seq)
if err != nil {
return nil, err
}
aggKeys := describeAggs(seq, []field.List{nil})
for i := range sortKeys {
// Convert SortKey to a pointer so a nil sort is encoded as null for
// JSON/ZSON.
var s *order.SortKey
if !sortKeys[i].IsNil() {
s = &sortKeys[i]
}
info.Channels = append(info.Channels, Channel{
Sort: s,
AggregationKeys: aggKeys[i],
})
}
return &info, nil
}

func describeSources(ctx context.Context, lk *lake.Root, o dag.Op) ([]Source, error) {
switch o := o.(type) {
case *dag.Fork:
var s []Source
for _, p := range o.Paths {
out, err := describeSources(ctx, lk, p[0])
if err != nil {
return nil, err
}
s = append(s, out...)
}
return s, nil
case *dag.DefaultScan:
return []Source{&Path{Kind: "Path", URI: "stdio://stdin"}}, nil
case *dag.FileScan:
return []Source{&Path{Kind: "Path", URI: o.Path}}, nil
case *dag.HTTPScan:
return []Source{&Path{Kind: "Path", URI: o.URL}}, nil
case *dag.PoolScan:
return sourceOfPool(ctx, lk, o.ID)
case *dag.Lister:
return sourceOfPool(ctx, lk, o.Pool)
case *dag.SeqScan:
return sourceOfPool(ctx, lk, o.Pool)
case *dag.CommitMetaScan:
return sourceOfPool(ctx, lk, o.Pool)
case *dag.LakeMetaScan:
return []Source{&LakeMeta{Kind: "LakeMeta", Meta: o.Meta}}, nil
default:
return nil, fmt.Errorf("unsupported source type %T", o)
}
}

func sourceOfPool(ctx context.Context, lk *lake.Root, id ksuid.KSUID) ([]Source, error) {
if lk == nil {
panic(errors.New("internal error: lake operation cannot be used in non-lake context"))
}
p, err := lk.OpenPool(ctx, id)
if err != nil {
return nil, err
}
return []Source{&Pool{
Kind: "Pool",
ID: id,
Name: p.Name,
}}, nil
}

func describeAggs(seq dag.Seq, parents []field.List) []field.List {
for _, op := range seq {
parents = describeOpAggs(op, parents)
}
return parents
}

func describeOpAggs(op dag.Op, parents []field.List) []field.List {
switch op := op.(type) {
case *dag.Fork:
var aggs []field.List
for _, p := range op.Paths {
aggs = append(aggs, describeAggs(p, []field.List{nil})...)
}
return aggs
case *dag.Scatter:
var aggs []field.List
for _, p := range op.Paths {
aggs = append(aggs, describeAggs(p, []field.List{nil})...)
}
return aggs
case *dag.Summarize:
// The field list for aggregation with no keys is an empty slice and
// not nil.
keys := field.List{}
for _, k := range op.Keys {
keys = append(keys, k.LHS.(*dag.This).Path)
}
return []field.List{keys}
}
// If more than one parent reset to nil aggregation.
if len(parents) > 1 {
return []field.List{nil}
}
return parents
}
6 changes: 5 additions & 1 deletion compiler/optimizer/optimizer.go
Original file line number Diff line number Diff line change
@@ -257,6 +257,10 @@ func (o *Optimizer) optimizeSourcePaths(seq dag.Seq) (dag.Seq, error) {
})
}

func (o *Optimizer) SortKeys(seq dag.Seq) ([]order.SortKey, error) {
return o.propagateSortKey(copyOps(seq), []order.SortKey{order.Nil})
}

// propagateSortKey analyzes a Seq and attempts to push the scan order of the data source
// into the first downstream aggregation. (We could continue the analysis past that
// point but don't bother yet because we do not yet support any optimization
@@ -330,7 +334,7 @@ func (o *Optimizer) propagateSortKeyOp(op dag.Op, parents []order.SortKey) ([]or
// We'll live this as unknown for now even though the groupby
// and not try to optimize downstream of the first groupby
// unless there is an excplicit sort encountered.
return nil, nil
return []order.SortKey{order.Nil}, nil
case *dag.Fork:
var keys []order.SortKey
for _, seq := range op.Paths {
1 change: 1 addition & 0 deletions service/core.go
Original file line number Diff line number Diff line change
@@ -188,6 +188,7 @@ func (c *Core) addAPIServerRoutes() {
c.authhandle("/pool/{pool}/revision/{revision}/vector", handleVectorDelete).Methods("DELETE")
c.authhandle("/pool/{pool}/stats", handlePoolStats).Methods("GET")
c.authhandle("/query", handleQuery).Methods("OPTIONS", "POST")
c.authhandle("/query/describe", handleQueryDescribe).Methods("OPTIONS", "POST")
c.authhandle("/query/status/{requestID}", handleQueryStatus).Methods("GET")
}

16 changes: 16 additions & 0 deletions service/handlers.go
Original file line number Diff line number Diff line change
@@ -12,12 +12,14 @@ import (
"github.com/brimdata/zed/api/queryio"
"github.com/brimdata/zed/compiler"
"github.com/brimdata/zed/compiler/ast"
"github.com/brimdata/zed/compiler/data"
"github.com/brimdata/zed/compiler/optimizer/demand"
"github.com/brimdata/zed/lake"
lakeapi "github.com/brimdata/zed/lake/api"
"github.com/brimdata/zed/lake/commits"
"github.com/brimdata/zed/lake/journal"
"github.com/brimdata/zed/lakeparse"
"github.com/brimdata/zed/pkg/storage"
"github.com/brimdata/zed/runtime"
"github.com/brimdata/zed/runtime/exec"
"github.com/brimdata/zed/runtime/sam/op"
@@ -170,6 +172,20 @@ func handleCompile(c *Core, w *ResponseWriter, r *Request) {
w.Respond(http.StatusOK, ast)
}

func handleQueryDescribe(c *Core, w *ResponseWriter, r *Request) {
var req api.QueryRequest
if !r.Unmarshal(w, &req) {
return
}
src := data.NewSource(storage.NewRemoteEngine(), c.root)
info, err := compiler.Describe(r.Context(), req.Query, src, &req.Head)
if err != nil {
w.Error(srverr.ErrInvalid(err))
return
}
w.Respond(http.StatusOK, info)
}

func handleBranchGet(c *Core, w *ResponseWriter, r *Request) {
branchName, ok := r.StringFromPath(w, "branch")
if !ok {
94 changes: 94 additions & 0 deletions service/ztests/query-describe.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
script: |
source service.sh
zed create -q test1
zed create -q test2
for file in multifrom.zed agg.zed agg-no-keys.zed; do
echo // === $file ===
query="$(cat $file | jq -Rsa .)"
curl -H "Accept: application/json" -d "{\"query\":$query,\"head\":{\"pool\":\"test1\"}}" $ZED_LAKE/query/describe |
zq -J 'sources := (over sources | id := "XXX")' -
done
inputs:
- name: service.sh
- name: multifrom.zed
data: |
from (
pool test1
pool test2
) | put foo := "bar"
- name: agg.zed
data: |
count() by key1:=v1, key2
- name: agg-no-keys.zed
data: |
sum(this)
outputs:
- name: stdout
data: |
// === multifrom.zed ===
{
"sources": [
{
"kind": "Pool",
"name": "test1",
"id": "XXX"
},
{
"kind": "Pool",
"name": "test2",
"id": "XXX"
}
],
"channels": [
{
"aggregation_keys": null,
"sort": {
"order": "desc",
"keys": [
[
"ts"
]
]
}
}
]
}
// === agg.zed ===
{
"sources": {
"kind": "Pool",
"name": "test1",
"id": "XXX"
},
"channels": [
{
"aggregation_keys": [
[
"key1"
],
[
"key2"
]
],
"sort": null
}
]
}
// === agg-no-keys.zed ===
{
"sources": {
"kind": "Pool",
"name": "test1",
"id": "XXX"
},
"channels": [
{
"aggregation_keys": [
],
"sort": null
}
]
}

0 comments on commit d30342f

Please sign in to comment.