Skip to content

Commit

Permalink
add more query commands (#14)
Browse files Browse the repository at this point in the history
  • Loading branch information
pmenglund authored Nov 10, 2023
1 parent 5e9639a commit 8eb96ac
Show file tree
Hide file tree
Showing 4 changed files with 187 additions and 42 deletions.
208 changes: 169 additions & 39 deletions cmd/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ package cmd

import (
"context"
"errors"
"fmt"
"github.com/rockset/cli/lookup"
"io"
"os"
"strings"
Expand All @@ -14,19 +14,22 @@ import (

"github.com/rockset/rockset-go-client"
"github.com/rockset/rockset-go-client/openapi"
"github.com/rockset/rockset-go-client/option"
"github.com/rockset/rockset-go-client/paginate"

"github.com/rockset/cli/config"
"github.com/rockset/cli/flag"
"github.com/rockset/cli/format"
"github.com/rockset/cli/lookup"
"github.com/rockset/cli/tui"
)

func newListQueryCmd() *cobra.Command {
func newListQueriesCmd() *cobra.Command {
cmd := cobra.Command{
Use: "query [virtual instance ID|NAME]",
Aliases: []string{"queries", "q"},
Use: "queries [ID|NAME]",
Aliases: []string{"query", "q"},
Short: "list queries",
Long: "list all active queries, or on a specific virtual instance",
Long: "list all actively queued and executing queries, or on a specific virtual instance",
Args: cobra.RangeArgs(0, 1),
Annotations: group("query"), // TODO should this be in the VI group too?
RunE: func(cmd *cobra.Command, args []string) error {
Expand Down Expand Up @@ -61,6 +64,108 @@ func newListQueryCmd() *cobra.Command {
return &cmd
}

func newGetQueryInfoCmd() *cobra.Command {
cmd := cobra.Command{
Use: "info ID",
Aliases: []string{"i"},
Short: "get query info",
Long: "get information about a query",
Args: cobra.ExactArgs(1),
Annotations: group("query"),
RunE: func(cmd *cobra.Command, args []string) error {
ctx := cmd.Context()

rs, err := config.Client(cmd, Version)
if err != nil {
return err
}

q, err := rs.GetQueryInfo(ctx, args[0])
if err != nil {
return err
}

return formatOne(cmd, q)
},
}

cmd.Flags().Bool(flag.Wide, false, "display more information")

return &cmd
}

func newGetQueryResultCmd() *cobra.Command {
cmd := cobra.Command{
Use: "results ID",
Aliases: []string{"r"},
Short: "get query results",
Long: fmt.Sprintf("Get query results for a previously executed query. "+
"If --%s isn't specified, all documents are retrieved.", flag.Docs),
Args: cobra.ExactArgs(1),
Annotations: group("query"),
RunE: func(cmd *cobra.Command, args []string) error {
ctx := cmd.Context()

rs, err := config.Client(cmd, Version)
if err != nil {
return err
}

// have to get info to get thew query stats
info, err := rs.GetQueryInfo(ctx, args[0])
if err != nil {
return err
}

var options []option.QueryResultOption
if cursor, _ := cmd.Flags().GetString(flag.Cursor); cursor != "" {
options = append(options, option.WithQueryResultCursor(cursor))
}
docs, _ := cmd.Flags().GetInt32(flag.Docs)
if docs != 0 {
options = append(options, option.WithQueryResultDocs(docs))
}
if offset, _ := cmd.Flags().GetInt32(flag.Offset); offset != 0 {
options = append(options, option.WithQueryResultOffset(offset))
}

stats := info.GetStats()
var list []map[string]interface{}
var cursor string
// if we don't have any options, we fetch all documents
if len(options) == 0 {
p := paginate.New(rs)
docCh := make(chan map[string]any, 100)

go func() {
err = p.GetQueryResults(ctx, docCh, args[0])
}()

for doc := range docCh {
list = append(list, doc)
}
} else {
result, err := rs.GetQueryResults(ctx, args[0], options...)
if err != nil {
return err
}
page := result.GetPagination()
cursor = page.GetNextCursor()
list = result.Results
}

return showQueryPaginationResponse(cmd.OutOrStdout(), cursor, stats.GetElapsedTimeMs(), list)
},
}

cmd.Flags().Bool(flag.Wide, false, "display more information")
cmd.Flags().String(flag.Cursor, "", "cursor to current page, defaults to first page")
cmd.Flags().Int32(flag.Docs, 0, "number of documents to fetch, 0 means fetch max")
cmd.Flags().Int32(flag.Offset, 0, "offset from the cursor of the first document to be returned")

return &cmd
}

func newQueryCmd() *cobra.Command {
cmd := cobra.Command{
Use: "query SQL",
Expand All @@ -74,6 +179,7 @@ func newQueryCmd() *cobra.Command {
return err
}

async, _ := cmd.Flags().GetBool(flag.Async)
vi, _ := cmd.Flags().GetString(flag.VI)
file, _ := cmd.Flags().GetString(flag.File)
validate, _ := cmd.Flags().GetBool(flag.Validate)
Expand Down Expand Up @@ -110,20 +216,31 @@ func newQueryCmd() *cobra.Command {
}

_, _ = fmt.Fprintf(cmd.OutOrStdout(), "SQL is valid\n")
return nil
}

var options []option.QueryOption
if async {
// TODO inform the user that --validate and --async are mutually exclusive?
options = append(options, option.WithAsync())
}

var result openapi.QueryResponse
if vi == "" {
result, err = rs.Query(ctx, sql)
result, err = rs.Query(ctx, sql, options...)
} else {
result, err = rs.ExecuteQueryOnVirtualInstance(ctx, vi, sql)
result, err = rs.ExecuteQueryOnVirtualInstance(ctx, vi, sql, options...)
}

if err != nil {
return err
}

err = showQueryResult(cmd.OutOrStdout(), result)
if async {
_, _ = fmt.Fprintf(cmd.OutOrStdout(), "query ID is: %s\n", result.GetQueryId())
return nil
}

err = showQueryResponse(cmd.OutOrStdout(), result)
if err != nil {
return err
}
Expand All @@ -132,6 +249,7 @@ func newQueryCmd() *cobra.Command {
},
}

cmd.Flags().Bool(flag.Async, false, "execute the query asynchronously")
cmd.Flags().Bool(flag.Validate, false, "validate SQL")
cmd.Flags().String(flag.File, "", "read SQL from file")
cmd.Flags().String(flag.VI, "", "execute query on virtual instance")
Expand All @@ -140,7 +258,22 @@ func newQueryCmd() *cobra.Command {
return &cmd
}

func showQueryResult(out io.Writer, result openapi.QueryResponse) error {
func showQueryPaginationResponse(out io.Writer, cursor string, elapsedMs int64, results []map[string]interface{}) error {
if len(results) == 0 {
return errors.New("query returned no rows")
}

var headers []string
for h := range results[0] {
headers = append(headers, h)
}

showQueryResponseTable(out, cursor, elapsedMs, headers, results)

return nil
}

func showQueryResponse(out io.Writer, result openapi.QueryResponse) error {
switch result.GetStatus() {
case "ERROR":
var errs []string
Expand All @@ -151,48 +284,45 @@ func showQueryResult(out io.Writer, result openapi.QueryResponse) error {
case "QUEUED", "RUNNING":
_, _ = fmt.Fprintf(out, "your query %s is %s\n", result.GetQueryId(), result.GetStatus())
case "COMPLETED":
t := tui.NewTable(out)

var headers []string
if len(result.GetColumnFields()) == 0 {
// pick out the headers from the first doc
var headers []string
for k := range result.Results[0] {
headers = append(headers, k)
}
t.Headers(headers...)

for _, row := range result.Results {
var r []string
for _, h := range headers {
r = append(r, fmt.Sprintf("%v", row[h]))
}
t.Row(r...)
// in a "SELECT *" query the ColumnFields isn't populated what order should the columns be presented in?
for h := range result.Results[0] {
headers = append(headers, h)
}
} else {
var headers []string
for _, h := range result.GetColumnFields() {
headers = append(headers, h.Name)
}
t.Headers(headers...)

for _, row := range result.Results {
var r []string
for _, column := range result.GetColumnFields() {
r = append(r, fmt.Sprintf("%v", row[column.GetName()]))
}
t.Row(r...)
}
}

_, _ = fmt.Fprintln(out, t.Render())
_, _ = fmt.Fprintf(out, "Elapsed time: %d ms\n\n", result.Stats.GetElapsedTimeMs())
stats := result.GetStats()
showQueryResponseTable(out, "", stats.GetElapsedTimeMs(), headers, result.Results)
default:
return fmt.Errorf("unexpected query status: %s", result.GetStatus())
}

return nil
}

func showQueryResponseTable(out io.Writer, cursor string, elapsedMs int64, headers []string, results []map[string]interface{}) {
t := tui.NewTable(out)
t.Headers(headers...)

for _, row := range results {
var r []string
for _, h := range headers {
r = append(r, fmt.Sprintf("%v", row[h]))
}
t.Row(r...)
}

_, _ = fmt.Fprintln(out, t.Render())
if cursor != "" {
_, _ = fmt.Fprintf(out, "Next cursor: %s\n", cursor)
}
_, _ = fmt.Fprintf(out, "Elapsed time: %d ms\n\n", elapsedMs)
}

func interactiveQuery(ctx context.Context, in io.ReadCloser, out io.Writer, rs *rockset.RockClient) error {
histFile, err := config.HistoryFile()
if err != nil {
Expand Down Expand Up @@ -264,7 +394,7 @@ func executeQuery(ctx context.Context, out io.Writer, rs *rockset.RockClient, sq
return
}

if err = showQueryResult(out, result); err != nil {
if err = showQueryResponse(out, result); err != nil {
slog.Error("failed to show result", err)
}
}
2 changes: 1 addition & 1 deletion cmd/query_lambda.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ func NewExecuteQueryLambdaCmd() *cobra.Command {
return err
}

return showQueryResult(cmd.OutOrStdout(), resp)
return showQueryResponse(cmd.OutOrStdout(), resp)
},
}

Expand Down
15 changes: 13 additions & 2 deletions cmd/verbs.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@ func addVerbs(root *cobra.Command) {
Long: "list Rockset resources",
}

queryCmd := cobra.Command{
Aliases: []string{"q"},
Short: "query resources",
Long: "query Rockset resources",
}

resumeCmd := cobra.Command{
Use: "resume",
Short: "resume resources",
Expand Down Expand Up @@ -105,7 +111,6 @@ func addVerbs(root *cobra.Command) {
deleteCmd.AddCommand(newDeleteCollectionCmd())
getCmd.AddCommand(newGetCollectionCmd())
listCmd.AddCommand(newListCollectionsCmd())
listCmd.AddCommand(newListQueryCmd())
sampleCmd.AddCommand(newCreateSampleCollectionCmd())
tailCmd.AddCommand(newCreateTailCollectionCmd())

Expand All @@ -114,6 +119,13 @@ func addVerbs(root *cobra.Command) {
getCmd.AddCommand(newGetIntegrationCmd())
listCmd.AddCommand(newListIntegrationsCmd())

// query
getCmd.AddCommand(&queryCmd)
listCmd.AddCommand(newListQueriesCmd()) // list queries
queryCmd.AddCommand(newGetQueryInfoCmd()) // get query info
queryCmd.AddCommand(newGetQueryResultCmd()) // get query result
root.AddCommand(newQueryCmd()) // execute a query

// org
getCmd.AddCommand(newGetOrganizationCmd())

Expand Down Expand Up @@ -194,7 +206,6 @@ func addVerbs(root *cobra.Command) {
root.AddCommand(&useCmd)
root.AddCommand(newVersionCmd())

root.AddCommand(newQueryCmd())
root.AddCommand(newIngestCmd())

root.AddCommand(newTestCmd())
Expand Down
4 changes: 4 additions & 0 deletions flag/command.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
package flag

const (
Async = "async"
Bucket = "bucket"
Compression = "compression"
Collection = "collection"
Cursor = "cursor"
Dataset = "dataset"
Description = "description"
Docs = "docs"
Email = "email"
File = "file"
Force = "force"
IngestTransformation = "ingest-transformation"
Integration = "integration"
Offset = "offset"
Pattern = "pattern"
Region = "region"
Retention = "retention"
Expand Down

0 comments on commit 8eb96ac

Please sign in to comment.