From a5ae1e44ee69b0583c29093eb39708eb74cde1eb Mon Sep 17 00:00:00 2001 From: Martin Englund Date: Wed, 1 Nov 2023 13:59:15 -0700 Subject: [PATCH] add more query commands --- cmd/flags.go | 4 + cmd/query.go | 190 ++++++++++++++++++++++++++++++++++---------- cmd/query_lambda.go | 2 +- cmd/verbs.go | 15 +++- 4 files changed, 168 insertions(+), 43 deletions(-) diff --git a/cmd/flags.go b/cmd/flags.go index 9e5a368..b759531 100644 --- a/cmd/flags.go +++ b/cmd/flags.go @@ -18,15 +18,19 @@ const ( // command specific flags const ( + AsyncFlag = "async" BucketFlag = "bucket" CompressionFlag = "compression" CollectionFlag = "collection" + CursorFlag = "cursor" DatasetFlag = "dataset" DescriptionFlag = "description" + DocsFlag = "docs" FileFlag = "file" ForceFlag = "force" IngestTransformation = "ingest-transformation" IntegrationFlag = "integration" + OffsetFlag = "offset" PatternFlag = "pattern" RegionFlag = "region" RetentionFlag = "retention" diff --git a/cmd/query.go b/cmd/query.go index 13b1891..93cefc2 100644 --- a/cmd/query.go +++ b/cmd/query.go @@ -2,6 +2,7 @@ package cmd import ( "context" + "errors" "fmt" "io" "os" @@ -13,18 +14,19 @@ import ( "github.com/rockset/rockset-go-client" "github.com/rockset/rockset-go-client/openapi" + "github.com/rockset/rockset-go-client/option" "github.com/rockset/cli/config" "github.com/rockset/cli/format" "github.com/rockset/cli/tui" ) -func newListQueryCmd() *cobra.Command { +func newListQueriesCmd() *cobra.Command { cmd := cobra.Command{ - Use: "query [virtual instance ID]", - Aliases: []string{"queries", "q"}, + Use: "queries [virtual instance ID]", + Aliases: []string{"query", "q"}, Short: "list queries", - Long: "list all active queries, or on a specific virtual instance", + Long: "list all actively queued and executing queries, or on a specific virtual instance", Args: cobra.RangeArgs(0, 1), Annotations: group("query"), // TODO should this be in the VI group too? RunE: func(cmd *cobra.Command, args []string) error { @@ -54,6 +56,91 @@ func newListQueryCmd() *cobra.Command { return &cmd } +func newGetQueryInfoCmd() *cobra.Command { + cmd := cobra.Command{ + Use: "info ID", + Aliases: []string{"i"}, + Short: "get query info", + Long: "get information about a query", + Args: cobra.ExactArgs(1), + Annotations: group("query"), + RunE: func(cmd *cobra.Command, args []string) error { + ctx := cmd.Context() + + rs, err := rockClient(cmd) + if err != nil { + return err + } + + q, err := rs.GetQueryInfo(ctx, args[0]) + if err != nil { + return err + } + + return formatOne(cmd, q) + }, + } + + cmd.Flags().Bool(WideFlag, false, "display more information") + + return &cmd +} + +func newGetQueryResultCmd() *cobra.Command { + cmd := cobra.Command{ + Use: "results ID", + Aliases: []string{"r"}, + Short: "get query results", + Long: "get query results", + Args: cobra.ExactArgs(1), + Annotations: group("query"), + RunE: func(cmd *cobra.Command, args []string) error { + ctx := cmd.Context() + + rs, err := rockClient(cmd) + if err != nil { + return err + } + + // have to get info to get thew query stats + info, err := rs.GetQueryInfo(ctx, args[0]) + if err != nil { + return err + } + + var options []option.QueryResultOption + if cursor, _ := cmd.Flags().GetString(CursorFlag); cursor != "" { + options = append(options, option.WithQueryResultCursor(cursor)) + } + if docs, _ := cmd.Flags().GetInt32(DocsFlag); docs != 0 { + options = append(options, option.WithQueryResultDocs(docs)) + } + if offset, _ := cmd.Flags().GetInt32(OffsetFlag); offset != 0 { + options = append(options, option.WithQueryResultOffset(offset)) + } + + result, err := rs.GetQueryResults(ctx, args[0], options...) + if err != nil { + return err + } + page := result.GetPagination() + stats := info.GetStats() + + // TODO should we add the option to set an environment variable e.g. ROCKSET_CURSOR + // with the "next cursor value" so consecutive calls gets the next page of results? + + return showQueryPaginationResponse(cmd.OutOrStdout(), page.GetNextCursor(), stats.GetElapsedTimeMs(), result.Results) + }, + } + + cmd.Flags().Bool(WideFlag, false, "display more information") + cmd.Flags().String(CursorFlag, "", "cursor to current page, defaults to first page") + cmd.Flags().Int32(DocsFlag, 0, "number of documents to fetch, 0 means fetch max") + cmd.Flags().Int32(OffsetFlag, 0, "offset from the cursor of the first document to be returned") + + return &cmd +} + func newQueryCmd() *cobra.Command { cmd := cobra.Command{ Use: "query SQL", @@ -70,6 +157,7 @@ func newQueryCmd() *cobra.Command { vi, _ := cmd.Flags().GetString("vi") file, _ := cmd.Flags().GetString(FileFlag) validate, _ := cmd.Flags().GetBool(ValidateFlag) + async, _ := cmd.Flags().GetBool(AsyncFlag) // TODO handle a parameterized query var sql string @@ -103,20 +191,31 @@ func newQueryCmd() *cobra.Command { } _, _ = fmt.Fprintf(cmd.OutOrStdout(), "SQL is valid\n") + return nil + } + + var options []option.QueryOption + if async { + // TODO inform the user that --validate and --async are mutually exclusive? + options = append(options, option.WithAsync()) } var result openapi.QueryResponse if vi == "" { - result, err = rs.Query(ctx, sql) + result, err = rs.Query(ctx, sql, options...) } else { - result, err = rs.ExecuteQueryOnVirtualInstance(ctx, vi, sql) + result, err = rs.ExecuteQueryOnVirtualInstance(ctx, vi, sql, options...) } - if err != nil { return err } - err = showQueryResult(cmd.OutOrStdout(), result) + if async { + _, _ = fmt.Fprintf(cmd.OutOrStdout(), "query ID is: %s\n", result.GetQueryId()) + return nil + } + + err = showQueryResponse(cmd.OutOrStdout(), result) if err != nil { return err } @@ -125,6 +224,7 @@ func newQueryCmd() *cobra.Command { }, } + cmd.Flags().Bool(AsyncFlag, false, "execute the query asynchronously") cmd.Flags().Bool(ValidateFlag, false, "validate SQL") cmd.Flags().String(FileFlag, "", "read SQL from file") cmd.Flags().String("vi", "", "execute query on virtual instance") @@ -133,7 +233,22 @@ func newQueryCmd() *cobra.Command { return &cmd } -func showQueryResult(out io.Writer, result openapi.QueryResponse) error { +func showQueryPaginationResponse(out io.Writer, cursor string, elapsedMs int64, results []map[string]interface{}) error { + if len(results) == 0 { + return errors.New("query returned no rows") + } + + var headers []string + for h := range results[0] { + headers = append(headers, h) + } + + showQueryResponseTable(out, cursor, elapsedMs, headers, results) + + return nil +} + +func showQueryResponse(out io.Writer, result openapi.QueryResponse) error { switch result.GetStatus() { case "ERROR": var errs []string @@ -144,43 +259,19 @@ func showQueryResult(out io.Writer, result openapi.QueryResponse) error { case "QUEUED", "RUNNING": _, _ = fmt.Fprintf(out, "your query %s is %s\n", result.GetQueryId(), result.GetStatus()) case "COMPLETED": - t := tui.NewTable(out) - + var headers []string if len(result.GetColumnFields()) == 0 { - // in a "SELECT *" query the ColumnFields isn't populated - // what order should the columns be presented in? - - var headers []string - for k := range result.Results[0] { - headers = append(headers, k) - } - t.Headers(headers...) - - for _, row := range result.Results { - var r []string - for _, h := range headers { - r = append(r, fmt.Sprintf("%v", row[h])) - } - t.Row(r...) + // in a "SELECT *" query the ColumnFields isn't populated what order should the columns be presented in? + for h := range result.Results[0] { + headers = append(headers, h) } } else { - var headers []string for _, h := range result.GetColumnFields() { headers = append(headers, h.Name) } - t.Headers(headers...) - - for _, row := range result.Results { - var r []string - for _, column := range result.GetColumnFields() { - r = append(r, fmt.Sprintf("%v", row[column.GetName()])) - } - t.Row(r...) - } } - - _, _ = fmt.Fprintln(out, t.Render()) - _, _ = fmt.Fprintf(out, "Elapsed time: %d ms\n\n", result.Stats.GetElapsedTimeMs()) + stats := result.GetStats() + showQueryResponseTable(out, "", stats.GetElapsedTimeMs(), headers, result.Results) default: return fmt.Errorf("unexpected query status: %s", result.GetStatus()) } @@ -188,6 +279,25 @@ func showQueryResult(out io.Writer, result openapi.QueryResponse) error { return nil } +func showQueryResponseTable(out io.Writer, cursor string, elapsedMs int64, headers []string, results []map[string]interface{}) { + t := tui.NewTable(out) + t.Headers(headers...) + + for _, row := range results { + var r []string + for _, h := range headers { + r = append(r, fmt.Sprintf("%v", row[h])) + } + t.Row(r...) + } + + _, _ = fmt.Fprintln(out, t.Render()) + if cursor != "" { + _, _ = fmt.Fprintf(out, "Next cursor: %s\n", cursor) + } + _, _ = fmt.Fprintf(out, "Elapsed time: %d ms\n\n", elapsedMs) +} + func interactiveQuery(ctx context.Context, in io.ReadCloser, out io.Writer, rs *rockset.RockClient) error { histFile, err := config.HistoryFile() if err != nil { @@ -259,7 +369,7 @@ func executeQuery(ctx context.Context, out io.Writer, rs *rockset.RockClient, sq return } - if err = showQueryResult(out, result); err != nil { + if err = showQueryResponse(out, result); err != nil { slog.Error("failed to show result", err) } } diff --git a/cmd/query_lambda.go b/cmd/query_lambda.go index 06693db..7fbeb2e 100644 --- a/cmd/query_lambda.go +++ b/cmd/query_lambda.go @@ -191,7 +191,7 @@ func NewExecuteQueryLambdaCmd() *cobra.Command { return err } - return showQueryResult(cmd.OutOrStdout(), resp) + return showQueryResponse(cmd.OutOrStdout(), resp) }, } diff --git a/cmd/verbs.go b/cmd/verbs.go index 2ba7f4f..f4bf1bf 100644 --- a/cmd/verbs.go +++ b/cmd/verbs.go @@ -45,6 +45,12 @@ func addVerbs(root *cobra.Command) { Long: "list Rockset resources", } + queryCmd := cobra.Command{ + Aliases: []string{"q"}, + Short: "query resources", + Long: "query Rockset resources", + } + resumeCmd := cobra.Command{ Use: "resume", Short: "resume resources", @@ -105,7 +111,6 @@ func addVerbs(root *cobra.Command) { deleteCmd.AddCommand(newDeleteCollectionCmd()) getCmd.AddCommand(newGetCollectionCmd()) listCmd.AddCommand(newListCollectionsCmd()) - listCmd.AddCommand(newListQueryCmd()) sampleCmd.AddCommand(newCreateSampleCollectionCmd()) tailCmd.AddCommand(newCreateTailCollectionCmd()) @@ -114,6 +119,13 @@ func addVerbs(root *cobra.Command) { getCmd.AddCommand(newGetIntegrationCmd()) listCmd.AddCommand(newListIntegrationsCmd()) + // query + getCmd.AddCommand(&queryCmd) + listCmd.AddCommand(newListQueriesCmd()) // list queries + queryCmd.AddCommand(newGetQueryInfoCmd()) // get query info + queryCmd.AddCommand(newGetQueryResultCmd()) // get query result + root.AddCommand(newQueryCmd()) // execute a query + // org getCmd.AddCommand(newGetOrganizationCmd()) @@ -185,7 +197,6 @@ func addVerbs(root *cobra.Command) { root.AddCommand(&useCmd) root.AddCommand(newVersionCmd()) - root.AddCommand(newQueryCmd()) root.AddCommand(newIngestCmd()) root.AddCommand(newTestCmd())