Skip to content

Commit

Permalink
feat: profilecli query-blocks merge
Browse files Browse the repository at this point in the history
  • Loading branch information
alsoba13 committed Oct 15, 2024
1 parent f7e1337 commit 27533b4
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 39 deletions.
22 changes: 11 additions & 11 deletions cmd/profilecli/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func main() {
adminCmd := app.Command("admin", "Administrative tasks for Pyroscope cluster operators.")

blocksCmd := adminCmd.Command("blocks", "Operate on Grafana Pyroscope's blocks.")
blocksCmd.Flag("path", "Path to blocks directory").Default("./data/local").StringVar(&cfg.blocks.path)
blocksCmd.Flag("path", "Path to blocks directory").Default("./data/anonymous/local").StringVar(&cfg.blocks.path)

blocksListCmd := blocksCmd.Command("list", "List blocks.")
blocksListCmd.Flag("restore-missing-meta", "").Default("false").BoolVar(&cfg.blocks.restoreMissingMeta)
Expand All @@ -56,6 +56,12 @@ func main() {
blocksCompactCmd.Arg("dest", "The destination where compacted blocks should be stored.").Required().StringVar(&cfg.blocks.compact.dst)
blocksCompactCmd.Flag("shards", "The amount of shards to split output blocks into.").Default("0").IntVar(&cfg.blocks.compact.shards)

blocksQueryCmd := blocksCmd.Command("query", "Query on local/remote blocks.")
blocksQuerySeriesCmd := blocksQueryCmd.Command("series", "Request series labels on local/remote blocks.")
blocksQuerySeriesParams := addBlocksQuerySeriesParams(blocksQuerySeriesCmd)
blocksQueryMergeCmd := blocksQueryCmd.Command("merge", "Request merged profile on local/remote block.")
blocksQueryMergeParams := addBlocksQueryMergeParams(blocksQueryMergeCmd)

parquetCmd := adminCmd.Command("parquet", "Operate on a Parquet file.")
parquetInspectCmd := parquetCmd.Command("inspect", "Inspect a parquet file's structure.")
parquetInspectFiles := parquetInspectCmd.Arg("file", "parquet file path").Required().ExistingFiles()
Expand All @@ -79,12 +85,6 @@ func main() {
queryTracerCmd := app.Command("query-tracer", "Analyze query traces.")
queryTracerParams := addQueryTracerParams(queryTracerCmd)

queryBlocksCmd := app.Command("query-blocks", "Query on local/remote blocks")
queryBlocksSeriesCmd := queryBlocksCmd.Command("series", "Request series labels on local/remote blocks")
queryBlocksSeriesParams := addQueryBlocksSeriesParams(queryBlocksSeriesCmd)
queryBlocksMergeCmd := queryBlocksCmd.Command("merge", "Request merged profile.")
queryBlocksMergeParams := addQueryBlocksMergeParams(queryBlocksMergeCmd)

uploadCmd := app.Command("upload", "Upload profile(s).")
uploadParams := addUploadParams(uploadCmd)

Expand Down Expand Up @@ -138,12 +138,12 @@ func main() {
os.Exit(checkError(err))
}

case queryBlocksSeriesCmd.FullCommand():
if err := queryBlocksSeries(ctx, queryBlocksSeriesParams); err != nil {
case blocksQuerySeriesCmd.FullCommand():
if err := blocksQuerySeries(ctx, blocksQuerySeriesParams); err != nil {
os.Exit(checkError(err))
}
case queryBlocksMergeCmd.FullCommand():
if err := queryBlocksMerge(ctx, queryBlocksMergeParams); err != nil {
case blocksQueryMergeCmd.FullCommand():
if err := blocksQueryMerge(ctx, blocksQueryMergeParams); err != nil {
os.Exit(checkError(err))
}

Expand Down
54 changes: 26 additions & 28 deletions cmd/profilecli/query-blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,57 +19,55 @@ import (
"github.com/grafana/pyroscope/pkg/phlaredb"
)

type queryBlocksParams struct {
LocalPath string
type blocksQueryParams struct {
BucketName string
BlockIds []string
TenantID string
ObjectStoreType string
Query string
}

type queryBlocksMergeParams struct {
*queryBlocksParams
type blocksQueryMergeParams struct {
*blocksQueryParams
Output string
ProfileType string
StacktraceSelector []string
}

type queryBlocksSeriesParams struct {
*queryBlocksParams
type blocksQuerySeriesParams struct {
*blocksQueryParams
LabelNames []string
}

func addQueryBlocksParams(queryCmd commander) *queryBlocksParams {
params := new(queryBlocksParams)
queryCmd.Flag("local-path", "Path to blocks directory.").Default("./data/anonymous/local").StringVar(&params.LocalPath)
func addBlocksQueryParams(queryCmd commander) *blocksQueryParams {
params := new(blocksQueryParams)
queryCmd.Flag("bucket-name", "The name of the object storage bucket.").StringVar(&params.BucketName)
queryCmd.Flag("object-store-type", "The type of the object storage (e.g., gcs).").Default("gcs").StringVar(&params.ObjectStoreType)
queryCmd.Flag("block-ids", "List of blocks ids to query on").StringsVar(&params.BlockIds)
queryCmd.Flag("block", "Block ids to query on (accepts multiples)").StringsVar(&params.BlockIds)
queryCmd.Flag("tenant-id", "Tenant id of the queried block for remote bucket").StringVar(&params.TenantID)
queryCmd.Flag("query", "Label selector to query.").Default("{}").StringVar(&params.Query)
return params
}

func addQueryBlocksMergeParams(queryCmd commander) *queryBlocksMergeParams {
params := new(queryBlocksMergeParams)
params.queryBlocksParams = addQueryBlocksParams(queryCmd)
func addBlocksQueryMergeParams(queryCmd commander) *blocksQueryMergeParams {
params := new(blocksQueryMergeParams)
params.blocksQueryParams = addBlocksQueryParams(queryCmd)
queryCmd.Flag("output", "How to output the result, examples: console, raw, pprof=./my.pprof").Default("console").StringVar(&params.Output)
queryCmd.Flag("profile-type", "Profile type to query.").Default("process_cpu:cpu:nanoseconds:cpu:nanoseconds").StringVar(&params.ProfileType)
queryCmd.Flag("stacktrace-selector", "Only query locations with those symbols. Provide multiple times starting with the root").StringsVar(&params.StacktraceSelector)
return params
}

func addQueryBlocksSeriesParams(queryCmd commander) *queryBlocksSeriesParams {
params := new(queryBlocksSeriesParams)
params.queryBlocksParams = addQueryBlocksParams(queryCmd)
func addBlocksQuerySeriesParams(queryCmd commander) *blocksQuerySeriesParams {
params := new(blocksQuerySeriesParams)
params.blocksQueryParams = addBlocksQueryParams(queryCmd)
queryCmd.Flag("label-names", "Filter returned labels to the supplied label names. Without any filter all labels are returned.").StringsVar(&params.LabelNames)
return params
}

func queryBlocksMerge(ctx context.Context, params *queryBlocksMergeParams) error {
level.Info(logger).Log("msg", "query-block merge", "blockIds", fmt.Sprintf("%v", params.BlockIds), "localPath",
params.LocalPath, "bucketName", params.BucketName, "tenantId", params.TenantID, "query", params.Query, "type", params.ProfileType)
func blocksQueryMerge(ctx context.Context, params *blocksQueryMergeParams) error {
level.Info(logger).Log("msg", "blocks query merge", "blockIds", fmt.Sprintf("%v", params.BlockIds), "path",
cfg.blocks.path, "bucketName", params.BucketName, "tenantId", params.TenantID, "query", params.Query, "type", params.ProfileType)

if len(params.BlockIds) > 1 {
return errors.New("query merge is limited to a single block")
Expand All @@ -94,7 +92,7 @@ func queryBlocksMerge(ctx context.Context, params *queryBlocksMergeParams) error
level.Info(logger).Log("msg", "selecting with stackstrace selector", "call-site", fmt.Sprintf("%#+v", params.StacktraceSelector))
}

bucket, err := getBucket(ctx, params.queryBlocksParams)
bucket, err := getBucket(ctx, params.blocksQueryParams)
if err != nil {
return err
}
Expand Down Expand Up @@ -122,11 +120,11 @@ func queryBlocksMerge(ctx context.Context, params *queryBlocksMergeParams) error
return outputMergeProfile(ctx, params.Output, resp)
}

func queryBlocksSeries(ctx context.Context, params *queryBlocksSeriesParams) error {
level.Info(logger).Log("msg", "query-block series", "labelNames", fmt.Sprintf("%v", params.LabelNames),
"blockIds", fmt.Sprintf("%v", params.BlockIds), "localPath", params.LocalPath, "bucketName", params.BucketName, "tenantId", params.TenantID)
func blocksQuerySeries(ctx context.Context, params *blocksQuerySeriesParams) error {
level.Info(logger).Log("msg", "blocks query series", "labelNames", fmt.Sprintf("%v", params.LabelNames),
"blockIds", fmt.Sprintf("%v", params.BlockIds), "path", cfg.blocks.path, "bucketName", params.BucketName, "tenantId", params.TenantID)

bucket, err := getBucket(ctx, params.queryBlocksParams)
bucket, err := getBucket(ctx, params.blocksQueryParams)
if err != nil {
return err
}
Expand All @@ -136,7 +134,7 @@ func queryBlocksSeries(ctx context.Context, params *queryBlocksSeriesParams) err
var from, to int64
from, to = math.MaxInt64, math.MinInt64
var targetBlockQueriers phlaredb.Queriers
for _, blockId := range params.queryBlocksParams.BlockIds {
for _, blockId := range params.blocksQueryParams.BlockIds {
meta, err := blockQuerier.BlockMeta(ctx, blockId)
if err != nil {
return err
Expand All @@ -161,15 +159,15 @@ func queryBlocksSeries(ctx context.Context, params *queryBlocksSeriesParams) err
return outputSeries(response.Msg.LabelsSet)
}

func getBucket(ctx context.Context, params *queryBlocksParams) (objstore.Bucket, error) {
func getBucket(ctx context.Context, params *blocksQueryParams) (objstore.Bucket, error) {
if params.BucketName != "" {
return getRemoteBucket(ctx, params)
} else {
return filesystem.NewBucket(params.LocalPath)
return filesystem.NewBucket(cfg.blocks.path)
}
}

func getRemoteBucket(ctx context.Context, params *queryBlocksParams) (objstore.Bucket, error) {
func getRemoteBucket(ctx context.Context, params *blocksQueryParams) (objstore.Bucket, error) {
if params.TenantID == "" {
return nil, errors.New("specify tenant id for remote bucket")
}
Expand Down

0 comments on commit 27533b4

Please sign in to comment.