Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Define and implement a new query interface #832

Merged
merged 1 commit into from
Sep 23, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 12 additions & 31 deletions cmd/cayley/command/repl.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,8 @@ import (
"github.com/spf13/viper"

"github.com/cayleygraph/cayley/clog"
"github.com/cayleygraph/cayley/graph"
"github.com/cayleygraph/cayley/internal/repl"
"github.com/cayleygraph/cayley/query"
"github.com/cayleygraph/quad"
)

const (
Expand Down Expand Up @@ -117,38 +115,21 @@ func NewQueryCmd() *cobra.Command {
if err != nil {
return err
}

l := query.GetLanguage(lang)
if l == nil {
return fmt.Errorf("unknown query language: %q", lang)
}
enc := json.NewEncoder(os.Stdout)
sess := l.Session(h)
ch := make(chan query.Result, 100)
go sess.Execute(ctx, querystr, ch, limit)
for i := 0; limit <= 0 || i < limit; i++ {
select {
case <-ctx.Done():
return ctx.Err()
case r, ok := <-ch:
if !ok {
return nil
} else if err = r.Err(); err != nil {
return err
}
obj := r.Result()
switch p := obj.(type) {
case map[string]graph.Ref:
m := make(map[string]quad.Value, len(p))
for k, v := range p {
m[k] = h.NameOf(v)
}
obj = m
}
enc.Encode(obj)
it, err := query.Execute(ctx, h, lang, querystr, query.Options{
Collation: query.JSON,
Limit: limit,
})
if err != nil {
return err
}
defer it.Close()
for i := 0; it.Next(ctx) && (limit <= 0 || i < limit); i++ {
if err = enc.Encode(it.Result()); err != nil {
return err
}
}
return nil
return it.Err()
},
}
registerQueryFlags(cmd)
Expand Down
70 changes: 37 additions & 33 deletions graph/graphtest/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -507,26 +507,22 @@ func checkQueries(t *testing.T, qs graph.QuadStore, timeout time.Duration) {
}
start := time.Now()
ses := gizmo.NewSession(qs)
c := make(chan query.Result, 5)
ctx := context.Background()
if timeout > 0 {
var cancel func()
ctx, cancel = context.WithTimeout(ctx, timeout)
defer cancel()
}
go ses.Execute(ctx, test.query, c, -1)
it, err := ses.Execute(ctx, test.query, query.Options{
Collation: query.JSON,
})
if err != nil {
t.Fatal(err)
}
defer it.Close()
var got []interface{}
for r := range c {
if err := r.Err(); err != nil {
t.Error("Error:", err)
continue
}
ses.Collate(r)
j, err := ses.Results()
if j == nil && err == nil {
continue
}
got = append(got, j.([]interface{})...)
for it.Next(ctx) {
got = append(got, it.Result())
}
t.Logf("%12v %v", time.Since(start), test.message)

Expand Down Expand Up @@ -580,26 +576,34 @@ func benchmarkQueries(b *testing.B, gen testutil.DatabaseFunc) {
b.StopTimer()
b.ResetTimer()
for i := 0; i < b.N; i++ {
c := make(chan query.Result, 5)
ctx := context.Background()
var cancel func()
if timeout > 0 {
ctx, cancel = context.WithTimeout(ctx, timeout)
}
ses := gizmo.NewSession(qs)
b.StartTimer()
go ses.Execute(ctx, bench.query, c, -1)
n := 0
for range c {
n++
}
b.StopTimer()
if n != len(bench.expect) {
b.Fatalf("unexpected number of results: %d vs %d", n, len(bench.expect))
}
if cancel != nil {
cancel()
}
func() {
ctx := context.Background()
if timeout > 0 {
var cancel func()
ctx, cancel = context.WithTimeout(ctx, timeout)
defer cancel()
}
ses := gizmo.NewSession(qs)
b.StartTimer()
it, err := ses.Execute(ctx, bench.query, query.Options{
Collation: query.Raw,
})
if err != nil {
b.Fatal(err)
}
defer it.Close()
n := 0
for it.Next(ctx) {
n++
}
if err = it.Err(); err != nil {
b.Fatal(err)
}
b.StopTimer()
if n != len(bench.expect) {
b.Fatalf("unexpected number of results: %d vs %d", n, len(bench.expect))
}
}()
}
})
}
Expand Down
30 changes: 14 additions & 16 deletions internal/http/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,27 +120,25 @@ func (api *API) ServeV1Query(w http.ResponseWriter, r *http.Request, params http
errFunc(w, err)
return
}
code := string(bodyBytes)

c := make(chan query.Result, 5)
go ses.Execute(ctx, code, c, limit)
it, err := ses.Execute(ctx, string(bodyBytes), query.Options{
Collation: query.JSON,
Limit: limit,
})
if err != nil {
errFunc(w, err)
return
}
defer it.Close()

for res := range c {
if err := res.Err(); err != nil {
if err == nil {
continue // wait for results channel to close
}
errFunc(w, err)
return
}
ses.Collate(res)
var out []interface{}
for it.Next(ctx) {
out = append(out, it.Result())
}
output, err := ses.Results()
if err != nil {
if err = it.Err(); err != nil {
errFunc(w, err)
return
}
_ = WriteResult(w, output)
_ = WriteResult(w, out)
}

func (api *API) ServeV1Shape(w http.ResponseWriter, r *http.Request, params httprouter.Params) {
Expand Down
20 changes: 13 additions & 7 deletions internal/repl/repl.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,21 @@ func Run(ctx context.Context, qu string, ses query.REPLSession) error {
}
}()
fmt.Printf("\n")
c := make(chan query.Result, 5)
go ses.Execute(ctx, qu, c, 100)
for res := range c {
if err := res.Err(); err != nil {
return err
}
fmt.Print(ses.FormatREPL(res))
it, err := ses.Execute(ctx, qu, query.Options{
Collation: query.REPL,
Limit: 100,
})
if err != nil {
return err
}
defer it.Close()
for it.Next(ctx) {
fmt.Print(it.Result())
nResults++
}
if err := it.Err(); err != nil {
return err
}
if nResults > 0 {
results := "Result"
if nResults > 1 {
Expand Down
Loading