Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow filtering data by pprof labels #3365

Merged
merged 2 commits into from
Jun 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 71 additions & 40 deletions pkg/parca/parca.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"os"
"path/filepath"
goruntime "runtime"
"runtime/pprof"
"strings"
"syscall"
"time"
Expand Down Expand Up @@ -435,7 +436,13 @@ func Run(ctx context.Context, logger log.Logger, reg *prometheus.Registry, flags
ctx, cancel := context.WithCancel(ctx)
gr.Add(
func() error {
return s.Run(ctx, symbolizationInterval)
var err error

pprof.Do(ctx, pprof.Labels("parca_component", "symbolizer"), func(ctx context.Context) {
err = s.Run(ctx, symbolizationInterval)
})

return err
},
func(_ error) {
level.Debug(logger).Log("msg", "symbolizer server shutting down")
Expand All @@ -444,7 +451,13 @@ func Run(ctx context.Context, logger log.Logger, reg *prometheus.Registry, flags
}
gr.Add(
func() error {
return discoveryManager.Run()
var err error

pprof.Do(ctx, pprof.Labels("parca_component", "discovery"), func(_ context.Context) {
err = discoveryManager.Run()
})

return err
},
func(_ error) {
level.Debug(logger).Log("msg", "discovery manager exiting")
Expand All @@ -453,7 +466,13 @@ func Run(ctx context.Context, logger log.Logger, reg *prometheus.Registry, flags
)
gr.Add(
func() error {
return m.Run(discoveryManager.SyncCh())
var err error

pprof.Do(ctx, pprof.Labels("parca_component", "scraper"), func(_ context.Context) {
err = m.Run(discoveryManager.SyncCh())
})

return err
},
func(_ error) {
level.Debug(logger).Log("msg", "scrape manager exiting")
Expand All @@ -462,7 +481,13 @@ func Run(ctx context.Context, logger log.Logger, reg *prometheus.Registry, flags
)
gr.Add(
func() error {
return cfgReloader.Run(ctx)
var err error

pprof.Do(ctx, pprof.Labels("parca_component", "config_reloader"), func(ctx context.Context) {
err = cfgReloader.Run(ctx)
})

return err
},
func(_ error) {
level.Debug(logger).Log("msg", "config file reloader exiting")
Expand All @@ -472,42 +497,48 @@ func Run(ctx context.Context, logger log.Logger, reg *prometheus.Registry, flags
parcaserver := server.NewServer(reg, version)
gr.Add(
func() error {
return parcaserver.ListenAndServe(
ctx,
logger,
flags.HTTPAddress,
flags.CORSAllowedOrigins,
flags.PathPrefix,
server.RegisterableFunc(func(ctx context.Context, srv *grpc.Server, mux *runtime.ServeMux, endpoint string, opts []grpc.DialOption) error {
debuginfopb.RegisterDebuginfoServiceServer(srv, dbginfo)
profilestorepb.RegisterProfileStoreServiceServer(srv, s)
profilestorepb.RegisterAgentsServiceServer(srv, s)
querypb.RegisterQueryServiceServer(srv, q)
scrapepb.RegisterScrapeServiceServer(srv, m)

if err := debuginfopb.RegisterDebuginfoServiceHandlerFromEndpoint(ctx, mux, endpoint, opts); err != nil {
return err
}

if err := profilestorepb.RegisterProfileStoreServiceHandlerFromEndpoint(ctx, mux, endpoint, opts); err != nil {
return err
}

if err := profilestorepb.RegisterAgentsServiceHandlerFromEndpoint(ctx, mux, endpoint, opts); err != nil {
return err
}

if err := querypb.RegisterQueryServiceHandlerFromEndpoint(ctx, mux, endpoint, opts); err != nil {
return err
}

if err := scrapepb.RegisterScrapeServiceHandlerFromEndpoint(ctx, mux, endpoint, opts); err != nil {
return err
}

return nil
}),
)
var err error

pprof.Do(ctx, pprof.Labels("parca_component", "http_server"), func(ctx context.Context) {
err = parcaserver.ListenAndServe(
ctx,
logger,
flags.HTTPAddress,
flags.CORSAllowedOrigins,
flags.PathPrefix,
server.RegisterableFunc(func(ctx context.Context, srv *grpc.Server, mux *runtime.ServeMux, endpoint string, opts []grpc.DialOption) error {
debuginfopb.RegisterDebuginfoServiceServer(srv, dbginfo)
profilestorepb.RegisterProfileStoreServiceServer(srv, s)
profilestorepb.RegisterAgentsServiceServer(srv, s)
querypb.RegisterQueryServiceServer(srv, q)
scrapepb.RegisterScrapeServiceServer(srv, m)

if err := debuginfopb.RegisterDebuginfoServiceHandlerFromEndpoint(ctx, mux, endpoint, opts); err != nil {
return err
}

if err := profilestorepb.RegisterProfileStoreServiceHandlerFromEndpoint(ctx, mux, endpoint, opts); err != nil {
return err
}

if err := profilestorepb.RegisterAgentsServiceHandlerFromEndpoint(ctx, mux, endpoint, opts); err != nil {
return err
}

if err := querypb.RegisterQueryServiceHandlerFromEndpoint(ctx, mux, endpoint, opts); err != nil {
return err
}

if err := scrapepb.RegisterScrapeServiceHandlerFromEndpoint(ctx, mux, endpoint, opts); err != nil {
return err
}

return nil
}),
)
})

return err
},
func(_ error) {
ctx, cancel := context.WithTimeout(ctx, 30*time.Second) // TODO make this a graceful shutdown config setting
Expand Down
104 changes: 104 additions & 0 deletions pkg/parca/parca_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,3 +386,107 @@ func TestPGOE2e(t *testing.T) {
require.NoError(t, os.WriteFile("./testdata/pgotest.res.prof", rawPprof, 0o644))
runCmd(t, "go", "build", "-pgo", "./testdata/pgotest.res.prof", "-o", "./testdata/pgotest", "./testdata/pgotest.go")
}

func TestLabels(t *testing.T) {
t.Parallel()

ctx := context.Background()
logger := log.NewNopLogger()
reg := prometheus.NewRegistry()
tracer := trace.NewNoopTracerProvider().Tracer("")
col, err := frostdb.New()
require.NoError(t, err)
colDB, err := col.DB(context.Background(), "parca")
require.NoError(t, err)

schema, err := parcacol.Schema()
require.NoError(t, err)

table, err := colDB.Table(
"labels",
frostdb.NewTableConfig(parcacol.SchemaDefinition()),
)
require.NoError(t, err)
m := metastoretest.NewTestMetastore(
t,
logger,
reg,
tracer,
)

metastore := metastore.NewInProcessClient(m)

fileContent, err := os.ReadFile("testdata/labels.pb.gz")
require.NoError(t, err)

store := profilestore.NewProfileColumnStore(
logger,
tracer,
metastore,
table,
schema,
true,
)

_, err = store.WriteRaw(ctx, &profilestorepb.WriteRawRequest{
Series: []*profilestorepb.RawProfileSeries{{
Labels: &profilestorepb.LabelSet{
Labels: []*profilestorepb.Label{
{
Name: "__name__",
Value: "process_cpu",
},
{
Name: "job",
Value: "default",
},
},
},
Samples: []*profilestorepb.RawSample{{
RawProfile: fileContent,
}},
}},
})
require.NoError(t, err)

require.NoError(t, table.EnsureCompaction())
api := queryservice.NewColumnQueryAPI(
logger,
tracer,
getShareServerConn(t),
parcacol.NewQuerier(
logger,
tracer,
query.NewEngine(
memory.DefaultAllocator,
colDB.TableProvider(),
),
"labels",
metastore,
),
)

ts := timestamppb.New(timestamp.Time(1677488315039)) // time_nanos of the profile divided by 1e6
res, err := api.Query(ctx, &querypb.QueryRequest{
ReportType: querypb.QueryRequest_REPORT_TYPE_PPROF,
Options: &querypb.QueryRequest_Single{
Single: &querypb.SingleProfile{
Query: `process_cpu:samples:count:cpu:nanoseconds:delta`,
Time: ts,
},
},
})
require.NoError(t, err)

resPprof, err := profile.ParseData(res.Report.(*querypb.QueryResponse_Pprof).Pprof)
require.NoError(t, err)

got := make(map[string]struct{})
for _, s := range resPprof.Sample {
for l := range s.Label {
got[l] = struct{}{}
}
}
want := map[string]struct{}{"api": {}}
require.Equal(t, want, got, "profile should contain pprof_labels from the original profile only")
}
Binary file added pkg/parca/testdata/labels.pb.gz
Binary file not shown.
21 changes: 21 additions & 0 deletions pkg/parcacol/arrow.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package parcacol
import (
"context"
"fmt"
"strings"

"github.com/apache/arrow/go/v13/arrow"
"github.com/apache/arrow/go/v13/arrow/array"
Expand Down Expand Up @@ -88,10 +89,30 @@ func (c *ArrowToProfileConverter) Convert(
return nil, fmt.Errorf("read stacktrace metadata: %w", err)
}

labelIndexes := make(map[string]int)
for i, field := range schema.Fields() {
if strings.HasPrefix(field.Name, ColumnPprofLabels+".") {
labelIndexes[strings.TrimPrefix(field.Name, ColumnPprofLabels+".")] = i
}
}

for i := 0; i < rows; i++ {
labels := make(map[string]string, len(labelIndexes))
for name, index := range labelIndexes {
c := ar.Column(index).(*array.Dictionary)
d := c.Dictionary().(*array.Binary)
if !c.IsNull(i) {
labelValue := d.Value(c.GetValueIndex(i))
if len(labelValue) > 0 {
labels[name] = string(labelValue)
}
}
}

samples = append(samples, &profile.SymbolizedSample{
Value: valueColumn.Value(i),
Locations: stacktraceLocations[i],
Label: labels,
})
}
}
Expand Down
16 changes: 9 additions & 7 deletions pkg/parcacol/normalizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"strconv"
"strings"

"github.com/prometheus/common/model"

pprofpb "github.com/parca-dev/parca/gen/proto/go/google/pprof"
pb "github.com/parca-dev/parca/gen/proto/go/parca/metastore/v1alpha1"
"github.com/parca-dev/parca/pkg/profile"
Expand Down Expand Up @@ -161,10 +163,10 @@ func LabelNamesFromSamples(
for labelName := range labels {
resLabelName := labelName
if _, ok := takenLabels[labelName]; ok {
resLabelName = "exported_" + resLabelName
resLabelName = model.ExportedLabelPrefix + resLabelName
}
if _, ok := resLabels[resLabelName]; !ok {
resLabelName = "exported_" + resLabelName
if _, ok := resLabels[resLabelName]; ok {
resLabelName = model.ExportedLabelPrefix + resLabelName
}
resLabels[resLabelName] = struct{}{}
}
Expand All @@ -186,7 +188,7 @@ func LabelNamesFromSamples(
}

// TODO: support num label units.
func LabelsFromSample(takenLabelNames map[string]string, stringTable []string, plabels []*pprofpb.Label) (map[string]string, map[string]int64) {
func LabelsFromSample(takenLabels map[string]string, stringTable []string, plabels []*pprofpb.Label) (map[string]string, map[string]int64) {
labels := map[string][]string{}
labelNames := []string{}
for _, label := range plabels {
Expand All @@ -207,11 +209,11 @@ func LabelsFromSample(takenLabelNames map[string]string, stringTable []string, p
resLabels := map[string]string{}
for _, labelName := range labelNames {
resLabelName := labelName
if _, ok := takenLabelNames[resLabelName]; ok {
resLabelName = "exported_" + resLabelName
if _, ok := takenLabels[resLabelName]; ok {
resLabelName = model.ExportedLabelPrefix + resLabelName
}
if _, ok := resLabels[resLabelName]; ok {
resLabelName = "exported_" + resLabelName
resLabelName = model.ExportedLabelPrefix + resLabelName
}
resLabels[resLabelName] = labels[labelName][0]
}
Expand Down
Loading