Skip to content

Commit

Permalink
feat: profilecli query-blocks merge
Browse files Browse the repository at this point in the history
feat: profilecli query-blocks merge
  • Loading branch information
alsoba13 committed Oct 15, 2024
1 parent 426515a commit c794fba
Show file tree
Hide file tree
Showing 5 changed files with 175 additions and 90 deletions.
20 changes: 13 additions & 7 deletions cmd/profilecli/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func main() {
adminCmd := app.Command("admin", "Administrative tasks for Pyroscope cluster operators.")

blocksCmd := adminCmd.Command("blocks", "Operate on Grafana Pyroscope's blocks.")
blocksCmd.Flag("path", "Path to blocks directory").Default("./data/local").StringVar(&cfg.blocks.path)
blocksCmd.Flag("path", "Path to blocks directory").Default("./data/anonymous/local").StringVar(&cfg.blocks.path)

blocksListCmd := blocksCmd.Command("list", "List blocks.")
blocksListCmd.Flag("restore-missing-meta", "").Default("false").BoolVar(&cfg.blocks.restoreMissingMeta)
Expand All @@ -56,6 +56,12 @@ func main() {
blocksCompactCmd.Arg("dest", "The destination where compacted blocks should be stored.").Required().StringVar(&cfg.blocks.compact.dst)
blocksCompactCmd.Flag("shards", "The amount of shards to split output blocks into.").Default("0").IntVar(&cfg.blocks.compact.shards)

blocksQueryCmd := blocksCmd.Command("query", "Query on local/remote blocks.")
blocksQuerySeriesCmd := blocksQueryCmd.Command("series", "Request series labels on local/remote blocks.")
blocksQuerySeriesParams := addBlocksQuerySeriesParams(blocksQuerySeriesCmd)
blocksQueryMergeCmd := blocksQueryCmd.Command("merge", "Request merged profile on local/remote block.")
blocksQueryMergeParams := addBlocksQueryMergeParams(blocksQueryMergeCmd)

parquetCmd := adminCmd.Command("parquet", "Operate on a Parquet file.")
parquetInspectCmd := parquetCmd.Command("inspect", "Inspect a parquet file's structure.")
parquetInspectFiles := parquetInspectCmd.Arg("file", "parquet file path").Required().ExistingFiles()
Expand All @@ -79,10 +85,6 @@ func main() {
queryTracerCmd := app.Command("query-tracer", "Analyze query traces.")
queryTracerParams := addQueryTracerParams(queryTracerCmd)

queryBlocksCmd := app.Command("query-blocks", "Query on local/remote blocks")
queryBlocksSeriesCmd := queryBlocksCmd.Command("series", "Request series labels on local/remote blocks")
queryBlocksSeriesParams := addQueryBlocksSeriesParams(queryBlocksSeriesCmd)

uploadCmd := app.Command("upload", "Upload profile(s).")
uploadParams := addUploadParams(uploadCmd)

Expand Down Expand Up @@ -136,8 +138,12 @@ func main() {
os.Exit(checkError(err))
}

case queryBlocksSeriesCmd.FullCommand():
if err := queryBlocksSeries(ctx, queryBlocksSeriesParams); err != nil {
case blocksQuerySeriesCmd.FullCommand():
if err := queryBlocksSeries(ctx, blocksQuerySeriesParams); err != nil {
os.Exit(checkError(err))
}
case blocksQueryMergeCmd.FullCommand():
if err := blocksQueryMerge(ctx, blocksQueryMergeParams); err != nil {
os.Exit(checkError(err))
}

Expand Down
Binary file removed cmd/profilecli/my.pprof
Binary file not shown.
75 changes: 75 additions & 0 deletions cmd/profilecli/output.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,31 @@
package main

import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"os"
"strings"

gprofile "github.com/google/pprof/profile"
"github.com/grafana/dskit/runutil"
"github.com/k0kubun/pp/v3"
"github.com/klauspost/compress/gzip"
"github.com/mattn/go-isatty"
"github.com/pkg/errors"

googlev1 "github.com/grafana/pyroscope/api/gen/proto/go/google/v1"
typesv1 "github.com/grafana/pyroscope/api/gen/proto/go/types/v1"
)

const (
outputConsole = "console"
outputRaw = "raw"
outputPprof = "pprof="
)

func outputSeries(result []*typesv1.Labels) error {
enc := json.NewEncoder(os.Stdout)
m := make(map[string]interface{})
Expand All @@ -21,3 +40,59 @@ func outputSeries(result []*typesv1.Labels) error {
}
return nil
}

func outputMergeProfile(ctx context.Context, outputFlag string, profile *googlev1.Profile) error {
mypp := pp.New()
mypp.SetColoringEnabled(isatty.IsTerminal(os.Stdout.Fd()))
mypp.SetExportedOnly(true)

if outputFlag == outputConsole {
buf, err := profile.MarshalVT()
if err != nil {
return errors.Wrap(err, "failed to marshal protobuf")
}

p, err := gprofile.Parse(bytes.NewReader(buf))
if err != nil {
return errors.Wrap(err, "failed to parse profile")
}

fmt.Fprintln(output(ctx), p.String())
return nil

}

if outputFlag == outputRaw {
mypp.Print(profile)
return nil
}

if strings.HasPrefix(outputFlag, outputPprof) {
filePath := strings.TrimPrefix(outputFlag, outputPprof)
if filePath == "" {
return errors.New("no file path specified after pprof=")
}
buf, err := profile.MarshalVT()
if err != nil {
return errors.Wrap(err, "failed to marshal protobuf")
}

// open new file, fail when the file already exists
f, err := os.OpenFile(filePath, os.O_RDWR|os.O_CREATE|os.O_EXCL, 0644)
if err != nil {
return errors.Wrap(err, "failed to create pprof file")
}
defer runutil.CloseWithErrCapture(&err, f, "failed to close pprof file")

gzipWriter := gzip.NewWriter(f)
defer runutil.CloseWithErrCapture(&err, gzipWriter, "failed to close pprof gzip writer")

if _, err := io.Copy(gzipWriter, bytes.NewReader(buf)); err != nil {
return errors.Wrap(err, "failed to write pprof")
}

return nil
}

return errors.Errorf("unknown output %s", outputFlag)
}
101 changes: 86 additions & 15 deletions cmd/profilecli/query-blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"github.com/pkg/errors"

ingestv1 "github.com/grafana/pyroscope/api/gen/proto/go/ingester/v1"
typesv1 "github.com/grafana/pyroscope/api/gen/proto/go/types/v1"
"github.com/grafana/pyroscope/pkg/model"
"github.com/grafana/pyroscope/pkg/objstore"
objstoreclient "github.com/grafana/pyroscope/pkg/objstore/client"
"github.com/grafana/pyroscope/pkg/objstore/providers/filesystem"
Expand All @@ -18,42 +20,111 @@ import (
)

type queryBlocksParams struct {
LocalPath string
BucketName string
BlockIds []string
TenantID string
ObjectStoreType string
Query string
}

type queryBlocksSeriesParams struct {
type blocksQueryMergeParams struct {
*queryBlocksParams
Output string
ProfileType string
StacktraceSelector []string
}

type blocksQuerySeriesParams struct {
*queryBlocksParams
LabelNames []string
}

func addQueryBlocksParams(queryCmd commander) *queryBlocksParams {
func addBlocksQueryParams(queryCmd commander) *queryBlocksParams {
params := new(queryBlocksParams)
queryCmd.Flag("local-path", "Path to blocks directory.").Default("./data/anonymous/local").StringVar(&params.LocalPath)
queryCmd.Flag("bucket-name", "The name of the object storage bucket.").StringVar(&params.BucketName)
queryCmd.Flag("object-store-type", "The type of the object storage (e.g., gcs).").Default("gcs").StringVar(&params.ObjectStoreType)
queryCmd.Flag("block-ids", "List of blocks ids to query on").StringsVar(&params.BlockIds)
queryCmd.Flag("block", "Block ids to query on (accepts multiples)").StringsVar(&params.BlockIds)
queryCmd.Flag("tenant-id", "Tenant id of the queried block for remote bucket").StringVar(&params.TenantID)
queryCmd.Flag("query", "Label selector to query.").Default("{}").StringVar(&params.Query)
return params
}

func addQueryBlocksSeriesParams(queryCmd commander) *queryBlocksSeriesParams {
params := new(queryBlocksSeriesParams)
params.queryBlocksParams = addQueryBlocksParams(queryCmd)
func addBlocksQueryMergeParams(queryCmd commander) *blocksQueryMergeParams {
params := new(blocksQueryMergeParams)
params.queryBlocksParams = addBlocksQueryParams(queryCmd)
queryCmd.Flag("output", "How to output the result, examples: console, raw, pprof=./my.pprof").Default("console").StringVar(&params.Output)
queryCmd.Flag("profile-type", "Profile type to query.").Default("process_cpu:cpu:nanoseconds:cpu:nanoseconds").StringVar(&params.ProfileType)
queryCmd.Flag("stacktrace-selector", "Only query locations with those symbols. Provide multiple times starting with the root").StringsVar(&params.StacktraceSelector)
return params
}

func addBlocksQuerySeriesParams(queryCmd commander) *blocksQuerySeriesParams {
params := new(blocksQuerySeriesParams)
params.queryBlocksParams = addBlocksQueryParams(queryCmd)
queryCmd.Flag("label-names", "Filter returned labels to the supplied label names. Without any filter all labels are returned.").StringsVar(&params.LabelNames)
return params
}

func queryBlocksSeries(ctx context.Context, params *queryBlocksSeriesParams) error {
level.Info(logger).Log("msg", "query-block series", "labelNames", fmt.Sprintf("%v", params.LabelNames),
"blockIds", fmt.Sprintf("%v", params.BlockIds), "localPath", params.LocalPath, "bucketName", params.BucketName, "tenantId", params.TenantID)
func blocksQueryMerge(ctx context.Context, params *blocksQueryMergeParams) error {
level.Info(logger).Log("msg", "blocks query merge", "blockIds", fmt.Sprintf("%v", params.BlockIds), "path",
cfg.blocks.path, "bucketName", params.BucketName, "tenantId", params.TenantID, "query", params.Query, "type", params.ProfileType)

if len(params.BlockIds) > 1 {
return errors.New("query merge is limited to a single block")
}

profileType, err := model.ParseProfileTypeSelector(params.ProfileType)
if err != nil {
return err
}

var stackTraceSelectors *typesv1.StackTraceSelector = nil
if len(params.StacktraceSelector) > 0 {
locations := make([]*typesv1.Location, 0, len(params.StacktraceSelector))
for _, cs := range params.StacktraceSelector {
locations = append(locations, &typesv1.Location{
Name: cs,
})
}
stackTraceSelectors = &typesv1.StackTraceSelector{
CallSite: locations,
}
level.Info(logger).Log("msg", "selecting with stackstrace selector", "call-site", fmt.Sprintf("%#+v", params.StacktraceSelector))
}

bucket, err := getBucket(ctx, params.queryBlocksParams)
if err != nil {
return err
}

meta, err := phlaredb.NewBlockQuerier(ctx, bucket).BlockMeta(ctx, params.BlockIds[0])
if err != nil {
return err
}

resp, err := phlaredb.NewSingleBlockQuerierFromMeta(ctx, bucket, meta).SelectMergePprof(
ctx,
&ingestv1.SelectProfilesRequest{
LabelSelector: params.Query,
Type: profileType,
Start: meta.MinTime.Time().UnixMilli(),
End: meta.MaxTime.Time().UnixMilli(),
},
0,
stackTraceSelectors,
)
if err != nil {
return errors.Wrap(err, "failed to query")
}

return outputMergeProfile(ctx, params.Output, resp)
}

func queryBlocksSeries(ctx context.Context, params *blocksQuerySeriesParams) error {
level.Info(logger).Log("msg", "blocks query series", "labelNames", fmt.Sprintf("%v", params.LabelNames),
"blockIds", fmt.Sprintf("%v", params.BlockIds), "path", cfg.blocks.path, "bucketName", params.BucketName, "tenantId", params.TenantID)

bucket, err := getBucket(ctx, params)
bucket, err := getBucket(ctx, params.queryBlocksParams)
if err != nil {
return err
}
Expand Down Expand Up @@ -88,15 +159,15 @@ func queryBlocksSeries(ctx context.Context, params *queryBlocksSeriesParams) err
return outputSeries(response.Msg.LabelsSet)
}

func getBucket(ctx context.Context, params *queryBlocksSeriesParams) (objstore.Bucket, error) {
func getBucket(ctx context.Context, params *queryBlocksParams) (objstore.Bucket, error) {
if params.BucketName != "" {
return getRemoteBucket(ctx, params)
} else {
return filesystem.NewBucket(params.LocalPath)
return filesystem.NewBucket(cfg.blocks.path)
}
}

func getRemoteBucket(ctx context.Context, params *queryBlocksSeriesParams) (objstore.Bucket, error) {
func getRemoteBucket(ctx context.Context, params *queryBlocksParams) (objstore.Bucket, error) {
if params.TenantID == "" {
return nil, errors.New("specify tenant id for remote bucket")
}
Expand Down
69 changes: 1 addition & 68 deletions cmd/profilecli/query.go
Original file line number Diff line number Diff line change
@@ -1,23 +1,14 @@
package main

import (
"bytes"
"context"
"fmt"
"io"
"os"
"sort"
"strings"
"time"

"connectrpc.com/connect"
"github.com/dustin/go-humanize"
"github.com/go-kit/log/level"
gprofile "github.com/google/pprof/profile"
"github.com/grafana/dskit/runutil"
"github.com/k0kubun/pp/v3"
"github.com/klauspost/compress/gzip"
"github.com/mattn/go-isatty"
"github.com/olekukonko/tablewriter"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
Expand All @@ -32,12 +23,6 @@ import (
"github.com/grafana/pyroscope/pkg/operations"
)

const (
outputConsole = "console"
outputRaw = "raw"
outputPprof = "pprof="
)

func (c *phlareClient) queryClient() querierv1connect.QuerierServiceClient {
return querierv1connect.NewQuerierServiceClient(
c.httpClient(),
Expand Down Expand Up @@ -156,59 +141,7 @@ func selectMergeProfile(ctx context.Context, client *phlareClient, outputFlag st
return errors.Wrap(err, "failed to query")
}

mypp := pp.New()
mypp.SetColoringEnabled(isatty.IsTerminal(os.Stdout.Fd()))
mypp.SetExportedOnly(true)

if outputFlag == outputConsole {
buf, err := resp.Msg.MarshalVT()
if err != nil {
return errors.Wrap(err, "failed to marshal protobuf")
}

p, err := gprofile.Parse(bytes.NewReader(buf))
if err != nil {
return errors.Wrap(err, "failed to parse profile")
}

fmt.Fprintln(output(ctx), p.String())
return nil

}

if outputFlag == outputRaw {
mypp.Print(resp.Msg)
return nil
}

if strings.HasPrefix(outputFlag, outputPprof) {
filePath := strings.TrimPrefix(outputFlag, outputPprof)
if filePath == "" {
return errors.New("no file path specified after pprof=")
}
buf, err := resp.Msg.MarshalVT()
if err != nil {
return errors.Wrap(err, "failed to marshal protobuf")
}

// open new file, fail when the file already exists
f, err := os.OpenFile(filePath, os.O_RDWR|os.O_CREATE|os.O_EXCL, 0644)
if err != nil {
return errors.Wrap(err, "failed to create pprof file")
}
defer runutil.CloseWithErrCapture(&err, f, "failed to close pprof file")

gzipWriter := gzip.NewWriter(f)
defer runutil.CloseWithErrCapture(&err, gzipWriter, "failed to close pprof gzip writer")

if _, err := io.Copy(gzipWriter, bytes.NewReader(buf)); err != nil {
return errors.Wrap(err, "failed to write pprof")
}

return nil
}

return errors.Errorf("unknown output %s", outputFlag)
return outputMergeProfile(ctx, outputFlag, resp.Msg)
}

type queryGoPGOParams struct {
Expand Down

0 comments on commit c794fba

Please sign in to comment.