Skip to content

Commit

Permalink
feat: profilecli query-blocks series (#3610)
Browse files Browse the repository at this point in the history
* feat: profilecli query-blocks series

* pr comments
  • Loading branch information
alsoba13 authored Oct 8, 2024
1 parent 0715df3 commit 426515a
Show file tree
Hide file tree
Showing 6 changed files with 186 additions and 19 deletions.
9 changes: 9 additions & 0 deletions cmd/profilecli/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ func main() {
queryTracerCmd := app.Command("query-tracer", "Analyze query traces.")
queryTracerParams := addQueryTracerParams(queryTracerCmd)

queryBlocksCmd := app.Command("query-blocks", "Query on local/remote blocks")
queryBlocksSeriesCmd := queryBlocksCmd.Command("series", "Request series labels on local/remote blocks")
queryBlocksSeriesParams := addQueryBlocksSeriesParams(queryBlocksSeriesCmd)

uploadCmd := app.Command("upload", "Upload profile(s).")
uploadParams := addUploadParams(uploadCmd)

Expand Down Expand Up @@ -132,6 +136,11 @@ func main() {
os.Exit(checkError(err))
}

case queryBlocksSeriesCmd.FullCommand():
if err := queryBlocksSeries(ctx, queryBlocksSeriesParams); err != nil {
os.Exit(checkError(err))
}

case queryLabelValuesCardinalityCmd.FullCommand():
if err := queryLabelValuesCardinality(ctx, queryLabelValuesCardinalityParams); err != nil {
os.Exit(checkError(err))
Expand Down
23 changes: 23 additions & 0 deletions cmd/profilecli/output.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package main

import (
"encoding/json"
"os"

typesv1 "github.com/grafana/pyroscope/api/gen/proto/go/types/v1"
)

func outputSeries(result []*typesv1.Labels) error {
enc := json.NewEncoder(os.Stdout)
m := make(map[string]interface{})
for _, s := range result {
clear(m)
for _, l := range s.Labels {
m[l.Name] = l.Value
}
if err := enc.Encode(m); err != nil {
return err
}
}
return nil
}
112 changes: 112 additions & 0 deletions cmd/profilecli/query-blocks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package main

import (
"context"
"fmt"
"math"

"connectrpc.com/connect"
"github.com/go-kit/log/level"
"github.com/pkg/errors"

ingestv1 "github.com/grafana/pyroscope/api/gen/proto/go/ingester/v1"
"github.com/grafana/pyroscope/pkg/objstore"
objstoreclient "github.com/grafana/pyroscope/pkg/objstore/client"
"github.com/grafana/pyroscope/pkg/objstore/providers/filesystem"
"github.com/grafana/pyroscope/pkg/objstore/providers/gcs"
"github.com/grafana/pyroscope/pkg/phlaredb"
)

type queryBlocksParams struct {
LocalPath string
BucketName string
BlockIds []string
TenantID string
ObjectStoreType string
Query string
}

type queryBlocksSeriesParams struct {
*queryBlocksParams
LabelNames []string
}

func addQueryBlocksParams(queryCmd commander) *queryBlocksParams {
params := new(queryBlocksParams)
queryCmd.Flag("local-path", "Path to blocks directory.").Default("./data/anonymous/local").StringVar(&params.LocalPath)
queryCmd.Flag("bucket-name", "The name of the object storage bucket.").StringVar(&params.BucketName)
queryCmd.Flag("object-store-type", "The type of the object storage (e.g., gcs).").Default("gcs").StringVar(&params.ObjectStoreType)
queryCmd.Flag("block-ids", "List of blocks ids to query on").StringsVar(&params.BlockIds)
queryCmd.Flag("tenant-id", "Tenant id of the queried block for remote bucket").StringVar(&params.TenantID)
queryCmd.Flag("query", "Label selector to query.").Default("{}").StringVar(&params.Query)
return params
}

func addQueryBlocksSeriesParams(queryCmd commander) *queryBlocksSeriesParams {
params := new(queryBlocksSeriesParams)
params.queryBlocksParams = addQueryBlocksParams(queryCmd)
queryCmd.Flag("label-names", "Filter returned labels to the supplied label names. Without any filter all labels are returned.").StringsVar(&params.LabelNames)
return params
}

func queryBlocksSeries(ctx context.Context, params *queryBlocksSeriesParams) error {
level.Info(logger).Log("msg", "query-block series", "labelNames", fmt.Sprintf("%v", params.LabelNames),
"blockIds", fmt.Sprintf("%v", params.BlockIds), "localPath", params.LocalPath, "bucketName", params.BucketName, "tenantId", params.TenantID)

bucket, err := getBucket(ctx, params)
if err != nil {
return err
}

blockQuerier := phlaredb.NewBlockQuerier(ctx, bucket)

var from, to int64
from, to = math.MaxInt64, math.MinInt64
var targetBlockQueriers phlaredb.Queriers
for _, blockId := range params.queryBlocksParams.BlockIds {
meta, err := blockQuerier.BlockMeta(ctx, blockId)
if err != nil {
return err
}
from = min(from, meta.MinTime.Time().UnixMilli())
to = max(to, meta.MaxTime.Time().UnixMilli())
targetBlockQueriers = append(targetBlockQueriers, phlaredb.NewSingleBlockQuerierFromMeta(ctx, bucket, meta))
}

response, err := targetBlockQueriers.Series(ctx, connect.NewRequest(
&ingestv1.SeriesRequest{
Start: from,
End: to,
Matchers: []string{params.Query},
LabelNames: params.LabelNames,
},
))
if err != nil {
return err
}

return outputSeries(response.Msg.LabelsSet)
}

func getBucket(ctx context.Context, params *queryBlocksSeriesParams) (objstore.Bucket, error) {
if params.BucketName != "" {
return getRemoteBucket(ctx, params)
} else {
return filesystem.NewBucket(params.LocalPath)
}
}

func getRemoteBucket(ctx context.Context, params *queryBlocksSeriesParams) (objstore.Bucket, error) {
if params.TenantID == "" {
return nil, errors.New("specify tenant id for remote bucket")
}
return objstoreclient.NewBucket(ctx, objstoreclient.Config{
StorageBackendConfig: objstoreclient.StorageBackendConfig{
Backend: params.ObjectStoreType,
GCS: gcs.Config{
BucketName: params.BucketName,
},
},
StoragePrefix: fmt.Sprintf("%s/phlaredb", params.TenantID),
}, params.BucketName)
}
19 changes: 2 additions & 17 deletions cmd/profilecli/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package main
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"os"
Expand Down Expand Up @@ -320,22 +319,8 @@ func querySeries(ctx context.Context, params *querySeriesParams) (err error) {
return errors.Errorf("unknown api type %s", params.APIType)
}

enc := json.NewEncoder(os.Stdout)
m := make(map[string]interface{})
for _, s := range result {
for k := range m {
delete(m, k)
}
for _, l := range s.Labels {
m[l.Name] = l.Value
}
if err := enc.Encode(m); err != nil {
return err
}
}

return nil

err = outputSeries(result)
return err
}

type queryLabelValuesCardinalityParams struct {
Expand Down
17 changes: 17 additions & 0 deletions pkg/phlaredb/block_querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,23 @@ func (b *BlockQuerier) BlockMetas(ctx context.Context) (metas []*block.Meta, _ e
return metas[0 : pos+1], nil
}

func (b *BlockQuerier) BlockMeta(ctx context.Context, name string) (meta *block.Meta, _ error) {
path := filepath.Join(name, block.MetaFilename)
metaReader, err := b.bkt.Get(ctx, path)
if err != nil {
level.Error(b.logger).Log("msg", "error reading block meta", "block", path, "err", err)
return nil, err
}

meta, err = block.Read(metaReader)
if err != nil {
level.Error(b.logger).Log("msg", "error parsing block meta", "block", path, "err", err)
return nil, err
}

return meta, nil
}

// Sync gradually scans the available blocks. If there are any changes to the
// last run it will Open/Close new/no longer existing ones.
func (b *BlockQuerier) Sync(ctx context.Context) error {
Expand Down
25 changes: 23 additions & 2 deletions pkg/phlaredb/block_querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import (
"github.com/grafana/pyroscope/pkg/pprof/testhelper"
)

const testDataPath = "./block/testdata/"

func TestQuerierBlockEviction(t *testing.T) {
type testCase struct {
blocks []string
Expand Down Expand Up @@ -105,7 +107,7 @@ func (p *profileCounter) Next() bool {
}

func TestBlockCompatability(t *testing.T) {
path := "./block/testdata/"
path := testDataPath
bucket, err := filesystem.NewBucket(path)
require.NoError(t, err)

Expand Down Expand Up @@ -156,7 +158,7 @@ func TestBlockCompatability(t *testing.T) {
}

func TestBlockCompatability_SelectMergeSpans(t *testing.T) {
path := "./block/testdata/"
path := testDataPath
bucket, err := filesystem.NewBucket(path)
require.NoError(t, err)

Expand Down Expand Up @@ -1386,3 +1388,22 @@ func testSelectMergeByStacktracesRace(t testing.TB, times int) {
require.NoError(t, g.Wait())
require.NoError(t, querier.Close())
}

func TestBlockMeta_loadsMetasIndividually(t *testing.T) {
path := testDataPath
bucket, err := filesystem.NewBucket(path)
require.NoError(t, err)

ctx := context.Background()
blockQuerier := NewBlockQuerier(ctx, bucket)
metas, err := blockQuerier.BlockMetas(ctx)
require.NoError(t, err)
require.NotEmpty(t, metas)

for _, meta := range metas {
singleMeta, err := blockQuerier.BlockMeta(ctx, meta.ULID.String())
require.NoError(t, err)

require.Equal(t, meta, singleMeta)
}
}

0 comments on commit 426515a

Please sign in to comment.