From bf3de86551c8c538ee5b173b9c833efcab343f86 Mon Sep 17 00:00:00 2001 From: Alberto Date: Wed, 16 Oct 2024 11:57:01 +0200 Subject: [PATCH] feat: profilecli query-blocks merge (#3625) --- cmd/profilecli/main.go | 22 +++++++------- cmd/profilecli/query-blocks.go | 54 ++++++++++++++++------------------ 2 files changed, 37 insertions(+), 39 deletions(-) diff --git a/cmd/profilecli/main.go b/cmd/profilecli/main.go index ca2921aed7..9f293f8032 100644 --- a/cmd/profilecli/main.go +++ b/cmd/profilecli/main.go @@ -46,7 +46,7 @@ func main() { adminCmd := app.Command("admin", "Administrative tasks for Pyroscope cluster operators.") blocksCmd := adminCmd.Command("blocks", "Operate on Grafana Pyroscope's blocks.") - blocksCmd.Flag("path", "Path to blocks directory").Default("./data/local").StringVar(&cfg.blocks.path) + blocksCmd.Flag("path", "Path to blocks directory").Default("./data/anonymous/local").StringVar(&cfg.blocks.path) blocksListCmd := blocksCmd.Command("list", "List blocks.") blocksListCmd.Flag("restore-missing-meta", "").Default("false").BoolVar(&cfg.blocks.restoreMissingMeta) @@ -56,6 +56,12 @@ func main() { blocksCompactCmd.Arg("dest", "The destination where compacted blocks should be stored.").Required().StringVar(&cfg.blocks.compact.dst) blocksCompactCmd.Flag("shards", "The amount of shards to split output blocks into.").Default("0").IntVar(&cfg.blocks.compact.shards) + blocksQueryCmd := blocksCmd.Command("query", "Query on local/remote blocks.") + blocksQuerySeriesCmd := blocksQueryCmd.Command("series", "Request series labels on local/remote blocks.") + blocksQuerySeriesParams := addBlocksQuerySeriesParams(blocksQuerySeriesCmd) + blocksQueryMergeCmd := blocksQueryCmd.Command("merge", "Request merged profile on local/remote block.") + blocksQueryMergeParams := addBlocksQueryMergeParams(blocksQueryMergeCmd) + parquetCmd := adminCmd.Command("parquet", "Operate on a Parquet file.") parquetInspectCmd := parquetCmd.Command("inspect", "Inspect a parquet file's structure.") parquetInspectFiles := parquetInspectCmd.Arg("file", "parquet file path").Required().ExistingFiles() @@ -79,12 +85,6 @@ func main() { queryTracerCmd := app.Command("query-tracer", "Analyze query traces.") queryTracerParams := addQueryTracerParams(queryTracerCmd) - queryBlocksCmd := app.Command("query-blocks", "Query on local/remote blocks") - queryBlocksSeriesCmd := queryBlocksCmd.Command("series", "Request series labels on local/remote blocks") - queryBlocksSeriesParams := addQueryBlocksSeriesParams(queryBlocksSeriesCmd) - queryBlocksMergeCmd := queryBlocksCmd.Command("merge", "Request merged profile.") - queryBlocksMergeParams := addQueryBlocksMergeParams(queryBlocksMergeCmd) - uploadCmd := app.Command("upload", "Upload profile(s).") uploadParams := addUploadParams(uploadCmd) @@ -138,12 +138,12 @@ func main() { os.Exit(checkError(err)) } - case queryBlocksSeriesCmd.FullCommand(): - if err := queryBlocksSeries(ctx, queryBlocksSeriesParams); err != nil { + case blocksQuerySeriesCmd.FullCommand(): + if err := blocksQuerySeries(ctx, blocksQuerySeriesParams); err != nil { os.Exit(checkError(err)) } - case queryBlocksMergeCmd.FullCommand(): - if err := queryBlocksMerge(ctx, queryBlocksMergeParams); err != nil { + case blocksQueryMergeCmd.FullCommand(): + if err := blocksQueryMerge(ctx, blocksQueryMergeParams); err != nil { os.Exit(checkError(err)) } diff --git a/cmd/profilecli/query-blocks.go b/cmd/profilecli/query-blocks.go index a973a71433..ecdb74c95b 100644 --- a/cmd/profilecli/query-blocks.go +++ b/cmd/profilecli/query-blocks.go @@ -19,8 +19,7 @@ import ( "github.com/grafana/pyroscope/pkg/phlaredb" ) -type queryBlocksParams struct { - LocalPath string +type blocksQueryParams struct { BucketName string BlockIds []string TenantID string @@ -28,48 +27,47 @@ type queryBlocksParams struct { Query string } -type queryBlocksMergeParams struct { - *queryBlocksParams +type blocksQueryMergeParams struct { + *blocksQueryParams Output string ProfileType string StacktraceSelector []string } -type queryBlocksSeriesParams struct { - *queryBlocksParams +type blocksQuerySeriesParams struct { + *blocksQueryParams LabelNames []string } -func addQueryBlocksParams(queryCmd commander) *queryBlocksParams { - params := new(queryBlocksParams) - queryCmd.Flag("local-path", "Path to blocks directory.").Default("./data/anonymous/local").StringVar(¶ms.LocalPath) +func addBlocksQueryParams(queryCmd commander) *blocksQueryParams { + params := new(blocksQueryParams) queryCmd.Flag("bucket-name", "The name of the object storage bucket.").StringVar(¶ms.BucketName) queryCmd.Flag("object-store-type", "The type of the object storage (e.g., gcs).").Default("gcs").StringVar(¶ms.ObjectStoreType) - queryCmd.Flag("block-ids", "List of blocks ids to query on").StringsVar(¶ms.BlockIds) + queryCmd.Flag("block", "Block ids to query on (accepts multiples)").StringsVar(¶ms.BlockIds) queryCmd.Flag("tenant-id", "Tenant id of the queried block for remote bucket").StringVar(¶ms.TenantID) queryCmd.Flag("query", "Label selector to query.").Default("{}").StringVar(¶ms.Query) return params } -func addQueryBlocksMergeParams(queryCmd commander) *queryBlocksMergeParams { - params := new(queryBlocksMergeParams) - params.queryBlocksParams = addQueryBlocksParams(queryCmd) +func addBlocksQueryMergeParams(queryCmd commander) *blocksQueryMergeParams { + params := new(blocksQueryMergeParams) + params.blocksQueryParams = addBlocksQueryParams(queryCmd) queryCmd.Flag("output", "How to output the result, examples: console, raw, pprof=./my.pprof").Default("console").StringVar(¶ms.Output) queryCmd.Flag("profile-type", "Profile type to query.").Default("process_cpu:cpu:nanoseconds:cpu:nanoseconds").StringVar(¶ms.ProfileType) queryCmd.Flag("stacktrace-selector", "Only query locations with those symbols. Provide multiple times starting with the root").StringsVar(¶ms.StacktraceSelector) return params } -func addQueryBlocksSeriesParams(queryCmd commander) *queryBlocksSeriesParams { - params := new(queryBlocksSeriesParams) - params.queryBlocksParams = addQueryBlocksParams(queryCmd) +func addBlocksQuerySeriesParams(queryCmd commander) *blocksQuerySeriesParams { + params := new(blocksQuerySeriesParams) + params.blocksQueryParams = addBlocksQueryParams(queryCmd) queryCmd.Flag("label-names", "Filter returned labels to the supplied label names. Without any filter all labels are returned.").StringsVar(¶ms.LabelNames) return params } -func queryBlocksMerge(ctx context.Context, params *queryBlocksMergeParams) error { - level.Info(logger).Log("msg", "query-block merge", "blockIds", fmt.Sprintf("%v", params.BlockIds), "localPath", - params.LocalPath, "bucketName", params.BucketName, "tenantId", params.TenantID, "query", params.Query, "type", params.ProfileType) +func blocksQueryMerge(ctx context.Context, params *blocksQueryMergeParams) error { + level.Info(logger).Log("msg", "blocks query merge", "blockIds", fmt.Sprintf("%v", params.BlockIds), "path", + cfg.blocks.path, "bucketName", params.BucketName, "tenantId", params.TenantID, "query", params.Query, "type", params.ProfileType) if len(params.BlockIds) > 1 { return errors.New("query merge is limited to a single block") @@ -94,7 +92,7 @@ func queryBlocksMerge(ctx context.Context, params *queryBlocksMergeParams) error level.Info(logger).Log("msg", "selecting with stackstrace selector", "call-site", fmt.Sprintf("%#+v", params.StacktraceSelector)) } - bucket, err := getBucket(ctx, params.queryBlocksParams) + bucket, err := getBucket(ctx, params.blocksQueryParams) if err != nil { return err } @@ -122,11 +120,11 @@ func queryBlocksMerge(ctx context.Context, params *queryBlocksMergeParams) error return outputMergeProfile(ctx, params.Output, resp) } -func queryBlocksSeries(ctx context.Context, params *queryBlocksSeriesParams) error { - level.Info(logger).Log("msg", "query-block series", "labelNames", fmt.Sprintf("%v", params.LabelNames), - "blockIds", fmt.Sprintf("%v", params.BlockIds), "localPath", params.LocalPath, "bucketName", params.BucketName, "tenantId", params.TenantID) +func blocksQuerySeries(ctx context.Context, params *blocksQuerySeriesParams) error { + level.Info(logger).Log("msg", "blocks query series", "labelNames", fmt.Sprintf("%v", params.LabelNames), + "blockIds", fmt.Sprintf("%v", params.BlockIds), "path", cfg.blocks.path, "bucketName", params.BucketName, "tenantId", params.TenantID) - bucket, err := getBucket(ctx, params.queryBlocksParams) + bucket, err := getBucket(ctx, params.blocksQueryParams) if err != nil { return err } @@ -136,7 +134,7 @@ func queryBlocksSeries(ctx context.Context, params *queryBlocksSeriesParams) err var from, to int64 from, to = math.MaxInt64, math.MinInt64 var targetBlockQueriers phlaredb.Queriers - for _, blockId := range params.queryBlocksParams.BlockIds { + for _, blockId := range params.blocksQueryParams.BlockIds { meta, err := blockQuerier.BlockMeta(ctx, blockId) if err != nil { return err @@ -161,15 +159,15 @@ func queryBlocksSeries(ctx context.Context, params *queryBlocksSeriesParams) err return outputSeries(response.Msg.LabelsSet) } -func getBucket(ctx context.Context, params *queryBlocksParams) (objstore.Bucket, error) { +func getBucket(ctx context.Context, params *blocksQueryParams) (objstore.Bucket, error) { if params.BucketName != "" { return getRemoteBucket(ctx, params) } else { - return filesystem.NewBucket(params.LocalPath) + return filesystem.NewBucket(cfg.blocks.path) } } -func getRemoteBucket(ctx context.Context, params *queryBlocksParams) (objstore.Bucket, error) { +func getRemoteBucket(ctx context.Context, params *blocksQueryParams) (objstore.Bucket, error) { if params.TenantID == "" { return nil, errors.New("specify tenant id for remote bucket") }