Skip to content

Commit

Permalink
add more query commands
Browse files Browse the repository at this point in the history
  • Loading branch information
pmenglund committed Nov 1, 2023
1 parent 8f889d9 commit a5ae1e4
Show file tree
Hide file tree
Showing 4 changed files with 168 additions and 43 deletions.
4 changes: 4 additions & 0 deletions cmd/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,19 @@ const (

// command specific flags
const (
AsyncFlag = "async"
BucketFlag = "bucket"
CompressionFlag = "compression"
CollectionFlag = "collection"
CursorFlag = "cursor"
DatasetFlag = "dataset"
DescriptionFlag = "description"
DocsFlag = "docs"
FileFlag = "file"
ForceFlag = "force"
IngestTransformation = "ingest-transformation"
IntegrationFlag = "integration"
OffsetFlag = "offset"
PatternFlag = "pattern"
RegionFlag = "region"
RetentionFlag = "retention"
Expand Down
190 changes: 150 additions & 40 deletions cmd/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cmd

import (
"context"
"errors"
"fmt"
"io"
"os"
Expand All @@ -13,18 +14,19 @@ import (

"github.com/rockset/rockset-go-client"
"github.com/rockset/rockset-go-client/openapi"
"github.com/rockset/rockset-go-client/option"

"github.com/rockset/cli/config"
"github.com/rockset/cli/format"
"github.com/rockset/cli/tui"
)

func newListQueryCmd() *cobra.Command {
func newListQueriesCmd() *cobra.Command {
cmd := cobra.Command{
Use: "query [virtual instance ID]",
Aliases: []string{"queries", "q"},
Use: "queries [virtual instance ID]",
Aliases: []string{"query", "q"},
Short: "list queries",
Long: "list all active queries, or on a specific virtual instance",
Long: "list all actively queued and executing queries, or on a specific virtual instance",
Args: cobra.RangeArgs(0, 1),
Annotations: group("query"), // TODO should this be in the VI group too?
RunE: func(cmd *cobra.Command, args []string) error {
Expand Down Expand Up @@ -54,6 +56,91 @@ func newListQueryCmd() *cobra.Command {
return &cmd
}

func newGetQueryInfoCmd() *cobra.Command {
cmd := cobra.Command{
Use: "info ID",
Aliases: []string{"i"},
Short: "get query info",
Long: "get information about a query",
Args: cobra.ExactArgs(1),
Annotations: group("query"),
RunE: func(cmd *cobra.Command, args []string) error {
ctx := cmd.Context()

rs, err := rockClient(cmd)
if err != nil {
return err
}

q, err := rs.GetQueryInfo(ctx, args[0])
if err != nil {
return err
}

return formatOne(cmd, q)
},
}

cmd.Flags().Bool(WideFlag, false, "display more information")

return &cmd
}

func newGetQueryResultCmd() *cobra.Command {
cmd := cobra.Command{
Use: "results ID",
Aliases: []string{"r"},
Short: "get query results",
Long: "get query results",
Args: cobra.ExactArgs(1),
Annotations: group("query"),
RunE: func(cmd *cobra.Command, args []string) error {
ctx := cmd.Context()

rs, err := rockClient(cmd)
if err != nil {
return err
}

// have to get info to get thew query stats
info, err := rs.GetQueryInfo(ctx, args[0])
if err != nil {
return err
}

var options []option.QueryResultOption

Check failure on line 111 in cmd/query.go

View workflow job for this annotation

GitHub Actions / test

undefined: option.QueryResultOption

Check failure on line 111 in cmd/query.go

View workflow job for this annotation

GitHub Actions / test

undefined: option.QueryResultOption
if cursor, _ := cmd.Flags().GetString(CursorFlag); cursor != "" {
options = append(options, option.WithQueryResultCursor(cursor))

Check failure on line 113 in cmd/query.go

View workflow job for this annotation

GitHub Actions / test

undefined: option.WithQueryResultCursor

Check failure on line 113 in cmd/query.go

View workflow job for this annotation

GitHub Actions / test

undefined: option.WithQueryResultCursor
}
if docs, _ := cmd.Flags().GetInt32(DocsFlag); docs != 0 {
options = append(options, option.WithQueryResultDocs(docs))

Check failure on line 116 in cmd/query.go

View workflow job for this annotation

GitHub Actions / test

undefined: option.WithQueryResultDocs

Check failure on line 116 in cmd/query.go

View workflow job for this annotation

GitHub Actions / test

undefined: option.WithQueryResultDocs
}
if offset, _ := cmd.Flags().GetInt32(OffsetFlag); offset != 0 {
options = append(options, option.WithQueryResultOffset(offset))

Check failure on line 119 in cmd/query.go

View workflow job for this annotation

GitHub Actions / test

undefined: option.WithQueryResultOffset

Check failure on line 119 in cmd/query.go

View workflow job for this annotation

GitHub Actions / test

undefined: option.WithQueryResultOffset
}

result, err := rs.GetQueryResults(ctx, args[0], options...)

Check failure on line 122 in cmd/query.go

View workflow job for this annotation

GitHub Actions / test

cannot use ... in call to non-variadic rs.GetQueryResults

Check failure on line 122 in cmd/query.go

View workflow job for this annotation

GitHub Actions / test

cannot use ... in call to non-variadic rs.GetQueryResults
if err != nil {
return err
}
page := result.GetPagination()
stats := info.GetStats()

// TODO should we add the option to set an environment variable e.g. ROCKSET_CURSOR
// with the "next cursor value" so consecutive calls gets the next page of results?

return showQueryPaginationResponse(cmd.OutOrStdout(), page.GetNextCursor(), stats.GetElapsedTimeMs(), result.Results)
},
}

cmd.Flags().Bool(WideFlag, false, "display more information")
cmd.Flags().String(CursorFlag, "", "cursor to current page, defaults to first page")
cmd.Flags().Int32(DocsFlag, 0, "number of documents to fetch, 0 means fetch max")
cmd.Flags().Int32(OffsetFlag, 0, "offset from the cursor of the first document to be returned")

return &cmd
}

func newQueryCmd() *cobra.Command {
cmd := cobra.Command{
Use: "query SQL",
Expand All @@ -70,6 +157,7 @@ func newQueryCmd() *cobra.Command {
vi, _ := cmd.Flags().GetString("vi")
file, _ := cmd.Flags().GetString(FileFlag)
validate, _ := cmd.Flags().GetBool(ValidateFlag)
async, _ := cmd.Flags().GetBool(AsyncFlag)

// TODO handle a parameterized query
var sql string
Expand Down Expand Up @@ -103,20 +191,31 @@ func newQueryCmd() *cobra.Command {
}

_, _ = fmt.Fprintf(cmd.OutOrStdout(), "SQL is valid\n")
return nil
}

var options []option.QueryOption
if async {
// TODO inform the user that --validate and --async are mutually exclusive?
options = append(options, option.WithAsync())
}

var result openapi.QueryResponse
if vi == "" {
result, err = rs.Query(ctx, sql)
result, err = rs.Query(ctx, sql, options...)
} else {
result, err = rs.ExecuteQueryOnVirtualInstance(ctx, vi, sql)
result, err = rs.ExecuteQueryOnVirtualInstance(ctx, vi, sql, options...)
}

if err != nil {
return err
}

err = showQueryResult(cmd.OutOrStdout(), result)
if async {
_, _ = fmt.Fprintf(cmd.OutOrStdout(), "query ID is: %s\n", result.GetQueryId())
return nil
}

err = showQueryResponse(cmd.OutOrStdout(), result)
if err != nil {
return err
}
Expand All @@ -125,6 +224,7 @@ func newQueryCmd() *cobra.Command {
},
}

cmd.Flags().Bool(AsyncFlag, false, "execute the query asynchronously")
cmd.Flags().Bool(ValidateFlag, false, "validate SQL")
cmd.Flags().String(FileFlag, "", "read SQL from file")
cmd.Flags().String("vi", "", "execute query on virtual instance")
Expand All @@ -133,7 +233,22 @@ func newQueryCmd() *cobra.Command {
return &cmd
}

func showQueryResult(out io.Writer, result openapi.QueryResponse) error {
func showQueryPaginationResponse(out io.Writer, cursor string, elapsedMs int64, results []map[string]interface{}) error {
if len(results) == 0 {
return errors.New("query returned no rows")
}

var headers []string
for h := range results[0] {
headers = append(headers, h)
}

showQueryResponseTable(out, cursor, elapsedMs, headers, results)

return nil
}

func showQueryResponse(out io.Writer, result openapi.QueryResponse) error {
switch result.GetStatus() {
case "ERROR":
var errs []string
Expand All @@ -144,50 +259,45 @@ func showQueryResult(out io.Writer, result openapi.QueryResponse) error {
case "QUEUED", "RUNNING":
_, _ = fmt.Fprintf(out, "your query %s is %s\n", result.GetQueryId(), result.GetStatus())
case "COMPLETED":
t := tui.NewTable(out)

var headers []string
if len(result.GetColumnFields()) == 0 {
// in a "SELECT *" query the ColumnFields isn't populated
// what order should the columns be presented in?

var headers []string
for k := range result.Results[0] {
headers = append(headers, k)
}
t.Headers(headers...)

for _, row := range result.Results {
var r []string
for _, h := range headers {
r = append(r, fmt.Sprintf("%v", row[h]))
}
t.Row(r...)
// in a "SELECT *" query the ColumnFields isn't populated what order should the columns be presented in?
for h := range result.Results[0] {
headers = append(headers, h)
}
} else {
var headers []string
for _, h := range result.GetColumnFields() {
headers = append(headers, h.Name)
}
t.Headers(headers...)

for _, row := range result.Results {
var r []string
for _, column := range result.GetColumnFields() {
r = append(r, fmt.Sprintf("%v", row[column.GetName()]))
}
t.Row(r...)
}
}

_, _ = fmt.Fprintln(out, t.Render())
_, _ = fmt.Fprintf(out, "Elapsed time: %d ms\n\n", result.Stats.GetElapsedTimeMs())
stats := result.GetStats()
showQueryResponseTable(out, "", stats.GetElapsedTimeMs(), headers, result.Results)
default:
return fmt.Errorf("unexpected query status: %s", result.GetStatus())
}

return nil
}

func showQueryResponseTable(out io.Writer, cursor string, elapsedMs int64, headers []string, results []map[string]interface{}) {
t := tui.NewTable(out)
t.Headers(headers...)

for _, row := range results {
var r []string
for _, h := range headers {
r = append(r, fmt.Sprintf("%v", row[h]))
}
t.Row(r...)
}

_, _ = fmt.Fprintln(out, t.Render())
if cursor != "" {
_, _ = fmt.Fprintf(out, "Next cursor: %s\n", cursor)
}
_, _ = fmt.Fprintf(out, "Elapsed time: %d ms\n\n", elapsedMs)
}

func interactiveQuery(ctx context.Context, in io.ReadCloser, out io.Writer, rs *rockset.RockClient) error {
histFile, err := config.HistoryFile()
if err != nil {
Expand Down Expand Up @@ -259,7 +369,7 @@ func executeQuery(ctx context.Context, out io.Writer, rs *rockset.RockClient, sq
return
}

if err = showQueryResult(out, result); err != nil {
if err = showQueryResponse(out, result); err != nil {
slog.Error("failed to show result", err)
}
}
2 changes: 1 addition & 1 deletion cmd/query_lambda.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ func NewExecuteQueryLambdaCmd() *cobra.Command {
return err
}

return showQueryResult(cmd.OutOrStdout(), resp)
return showQueryResponse(cmd.OutOrStdout(), resp)
},
}

Expand Down
15 changes: 13 additions & 2 deletions cmd/verbs.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@ func addVerbs(root *cobra.Command) {
Long: "list Rockset resources",
}

queryCmd := cobra.Command{
Aliases: []string{"q"},
Short: "query resources",
Long: "query Rockset resources",
}

resumeCmd := cobra.Command{
Use: "resume",
Short: "resume resources",
Expand Down Expand Up @@ -105,7 +111,6 @@ func addVerbs(root *cobra.Command) {
deleteCmd.AddCommand(newDeleteCollectionCmd())
getCmd.AddCommand(newGetCollectionCmd())
listCmd.AddCommand(newListCollectionsCmd())
listCmd.AddCommand(newListQueryCmd())
sampleCmd.AddCommand(newCreateSampleCollectionCmd())
tailCmd.AddCommand(newCreateTailCollectionCmd())

Expand All @@ -114,6 +119,13 @@ func addVerbs(root *cobra.Command) {
getCmd.AddCommand(newGetIntegrationCmd())
listCmd.AddCommand(newListIntegrationsCmd())

// query
getCmd.AddCommand(&queryCmd)
listCmd.AddCommand(newListQueriesCmd()) // list queries
queryCmd.AddCommand(newGetQueryInfoCmd()) // get query info
queryCmd.AddCommand(newGetQueryResultCmd()) // get query result
root.AddCommand(newQueryCmd()) // execute a query

// org
getCmd.AddCommand(newGetOrganizationCmd())

Expand Down Expand Up @@ -185,7 +197,6 @@ func addVerbs(root *cobra.Command) {
root.AddCommand(&useCmd)
root.AddCommand(newVersionCmd())

root.AddCommand(newQueryCmd())
root.AddCommand(newIngestCmd())

root.AddCommand(newTestCmd())
Expand Down

0 comments on commit a5ae1e4

Please sign in to comment.