diff --git a/pkg/parca/parca.go b/pkg/parca/parca.go index 727d52c61aa..db2114f7ff4 100644 --- a/pkg/parca/parca.go +++ b/pkg/parca/parca.go @@ -22,6 +22,7 @@ import ( "os" "path/filepath" goruntime "runtime" + "runtime/pprof" "strings" "syscall" "time" @@ -435,7 +436,13 @@ func Run(ctx context.Context, logger log.Logger, reg *prometheus.Registry, flags ctx, cancel := context.WithCancel(ctx) gr.Add( func() error { - return s.Run(ctx, symbolizationInterval) + var err error + + pprof.Do(ctx, pprof.Labels("parca_component", "symbolizer"), func(ctx context.Context) { + err = s.Run(ctx, symbolizationInterval) + }) + + return err }, func(_ error) { level.Debug(logger).Log("msg", "symbolizer server shutting down") @@ -444,7 +451,13 @@ func Run(ctx context.Context, logger log.Logger, reg *prometheus.Registry, flags } gr.Add( func() error { - return discoveryManager.Run() + var err error + + pprof.Do(ctx, pprof.Labels("parca_component", "discovery"), func(_ context.Context) { + err = discoveryManager.Run() + }) + + return err }, func(_ error) { level.Debug(logger).Log("msg", "discovery manager exiting") @@ -453,7 +466,13 @@ func Run(ctx context.Context, logger log.Logger, reg *prometheus.Registry, flags ) gr.Add( func() error { - return m.Run(discoveryManager.SyncCh()) + var err error + + pprof.Do(ctx, pprof.Labels("parca_component", "scraper"), func(_ context.Context) { + err = m.Run(discoveryManager.SyncCh()) + }) + + return err }, func(_ error) { level.Debug(logger).Log("msg", "scrape manager exiting") @@ -462,7 +481,13 @@ func Run(ctx context.Context, logger log.Logger, reg *prometheus.Registry, flags ) gr.Add( func() error { - return cfgReloader.Run(ctx) + var err error + + pprof.Do(ctx, pprof.Labels("parca_component", "config_reloader"), func(ctx context.Context) { + err = cfgReloader.Run(ctx) + }) + + return err }, func(_ error) { level.Debug(logger).Log("msg", "config file reloader exiting") @@ -472,42 +497,48 @@ func Run(ctx context.Context, logger log.Logger, reg *prometheus.Registry, flags parcaserver := server.NewServer(reg, version) gr.Add( func() error { - return parcaserver.ListenAndServe( - ctx, - logger, - flags.HTTPAddress, - flags.CORSAllowedOrigins, - flags.PathPrefix, - server.RegisterableFunc(func(ctx context.Context, srv *grpc.Server, mux *runtime.ServeMux, endpoint string, opts []grpc.DialOption) error { - debuginfopb.RegisterDebuginfoServiceServer(srv, dbginfo) - profilestorepb.RegisterProfileStoreServiceServer(srv, s) - profilestorepb.RegisterAgentsServiceServer(srv, s) - querypb.RegisterQueryServiceServer(srv, q) - scrapepb.RegisterScrapeServiceServer(srv, m) - - if err := debuginfopb.RegisterDebuginfoServiceHandlerFromEndpoint(ctx, mux, endpoint, opts); err != nil { - return err - } - - if err := profilestorepb.RegisterProfileStoreServiceHandlerFromEndpoint(ctx, mux, endpoint, opts); err != nil { - return err - } - - if err := profilestorepb.RegisterAgentsServiceHandlerFromEndpoint(ctx, mux, endpoint, opts); err != nil { - return err - } - - if err := querypb.RegisterQueryServiceHandlerFromEndpoint(ctx, mux, endpoint, opts); err != nil { - return err - } - - if err := scrapepb.RegisterScrapeServiceHandlerFromEndpoint(ctx, mux, endpoint, opts); err != nil { - return err - } - - return nil - }), - ) + var err error + + pprof.Do(ctx, pprof.Labels("parca_component", "http_server"), func(ctx context.Context) { + err = parcaserver.ListenAndServe( + ctx, + logger, + flags.HTTPAddress, + flags.CORSAllowedOrigins, + flags.PathPrefix, + server.RegisterableFunc(func(ctx context.Context, srv *grpc.Server, mux *runtime.ServeMux, endpoint string, opts []grpc.DialOption) error { + debuginfopb.RegisterDebuginfoServiceServer(srv, dbginfo) + profilestorepb.RegisterProfileStoreServiceServer(srv, s) + profilestorepb.RegisterAgentsServiceServer(srv, s) + querypb.RegisterQueryServiceServer(srv, q) + scrapepb.RegisterScrapeServiceServer(srv, m) + + if err := debuginfopb.RegisterDebuginfoServiceHandlerFromEndpoint(ctx, mux, endpoint, opts); err != nil { + return err + } + + if err := profilestorepb.RegisterProfileStoreServiceHandlerFromEndpoint(ctx, mux, endpoint, opts); err != nil { + return err + } + + if err := profilestorepb.RegisterAgentsServiceHandlerFromEndpoint(ctx, mux, endpoint, opts); err != nil { + return err + } + + if err := querypb.RegisterQueryServiceHandlerFromEndpoint(ctx, mux, endpoint, opts); err != nil { + return err + } + + if err := scrapepb.RegisterScrapeServiceHandlerFromEndpoint(ctx, mux, endpoint, opts); err != nil { + return err + } + + return nil + }), + ) + }) + + return err }, func(_ error) { ctx, cancel := context.WithTimeout(ctx, 30*time.Second) // TODO make this a graceful shutdown config setting diff --git a/pkg/parca/parca_test.go b/pkg/parca/parca_test.go index 3336db62439..b1826d58be8 100644 --- a/pkg/parca/parca_test.go +++ b/pkg/parca/parca_test.go @@ -386,3 +386,107 @@ func TestPGOE2e(t *testing.T) { require.NoError(t, os.WriteFile("./testdata/pgotest.res.prof", rawPprof, 0o644)) runCmd(t, "go", "build", "-pgo", "./testdata/pgotest.res.prof", "-o", "./testdata/pgotest", "./testdata/pgotest.go") } + +func TestLabels(t *testing.T) { + t.Parallel() + + ctx := context.Background() + logger := log.NewNopLogger() + reg := prometheus.NewRegistry() + tracer := trace.NewNoopTracerProvider().Tracer("") + col, err := frostdb.New() + require.NoError(t, err) + colDB, err := col.DB(context.Background(), "parca") + require.NoError(t, err) + + schema, err := parcacol.Schema() + require.NoError(t, err) + + table, err := colDB.Table( + "labels", + frostdb.NewTableConfig(parcacol.SchemaDefinition()), + ) + require.NoError(t, err) + m := metastoretest.NewTestMetastore( + t, + logger, + reg, + tracer, + ) + + metastore := metastore.NewInProcessClient(m) + + fileContent, err := os.ReadFile("testdata/labels.pb.gz") + require.NoError(t, err) + + store := profilestore.NewProfileColumnStore( + logger, + tracer, + metastore, + table, + schema, + true, + ) + + _, err = store.WriteRaw(ctx, &profilestorepb.WriteRawRequest{ + Series: []*profilestorepb.RawProfileSeries{{ + Labels: &profilestorepb.LabelSet{ + Labels: []*profilestorepb.Label{ + { + Name: "__name__", + Value: "process_cpu", + }, + { + Name: "job", + Value: "default", + }, + }, + }, + Samples: []*profilestorepb.RawSample{{ + RawProfile: fileContent, + }}, + }}, + }) + require.NoError(t, err) + + require.NoError(t, table.EnsureCompaction()) + api := queryservice.NewColumnQueryAPI( + logger, + tracer, + getShareServerConn(t), + parcacol.NewQuerier( + logger, + tracer, + query.NewEngine( + memory.DefaultAllocator, + colDB.TableProvider(), + ), + "labels", + metastore, + ), + ) + + ts := timestamppb.New(timestamp.Time(1677488315039)) // time_nanos of the profile divided by 1e6 + res, err := api.Query(ctx, &querypb.QueryRequest{ + ReportType: querypb.QueryRequest_REPORT_TYPE_PPROF, + Options: &querypb.QueryRequest_Single{ + Single: &querypb.SingleProfile{ + Query: `process_cpu:samples:count:cpu:nanoseconds:delta`, + Time: ts, + }, + }, + }) + require.NoError(t, err) + + resPprof, err := profile.ParseData(res.Report.(*querypb.QueryResponse_Pprof).Pprof) + require.NoError(t, err) + + got := make(map[string]struct{}) + for _, s := range resPprof.Sample { + for l := range s.Label { + got[l] = struct{}{} + } + } + want := map[string]struct{}{"api": {}} + require.Equal(t, want, got, "profile should contain pprof_labels from the original profile only") +} diff --git a/pkg/parca/testdata/labels.pb.gz b/pkg/parca/testdata/labels.pb.gz new file mode 100644 index 00000000000..88a82b834f9 Binary files /dev/null and b/pkg/parca/testdata/labels.pb.gz differ diff --git a/pkg/parcacol/arrow.go b/pkg/parcacol/arrow.go index 39f5df2e82d..7401e204762 100644 --- a/pkg/parcacol/arrow.go +++ b/pkg/parcacol/arrow.go @@ -16,6 +16,7 @@ package parcacol import ( "context" "fmt" + "strings" "github.com/apache/arrow/go/v13/arrow" "github.com/apache/arrow/go/v13/arrow/array" @@ -88,10 +89,30 @@ func (c *ArrowToProfileConverter) Convert( return nil, fmt.Errorf("read stacktrace metadata: %w", err) } + labelIndexes := make(map[string]int) + for i, field := range schema.Fields() { + if strings.HasPrefix(field.Name, ColumnPprofLabels+".") { + labelIndexes[strings.TrimPrefix(field.Name, ColumnPprofLabels+".")] = i + } + } + for i := 0; i < rows; i++ { + labels := make(map[string]string, len(labelIndexes)) + for name, index := range labelIndexes { + c := ar.Column(index).(*array.Dictionary) + d := c.Dictionary().(*array.Binary) + if !c.IsNull(i) { + labelValue := d.Value(c.GetValueIndex(i)) + if len(labelValue) > 0 { + labels[name] = string(labelValue) + } + } + } + samples = append(samples, &profile.SymbolizedSample{ Value: valueColumn.Value(i), Locations: stacktraceLocations[i], + Label: labels, }) } } diff --git a/pkg/parcacol/normalizer.go b/pkg/parcacol/normalizer.go index 0b7ac42a597..af7a83043de 100644 --- a/pkg/parcacol/normalizer.go +++ b/pkg/parcacol/normalizer.go @@ -20,6 +20,8 @@ import ( "strconv" "strings" + "github.com/prometheus/common/model" + pprofpb "github.com/parca-dev/parca/gen/proto/go/google/pprof" pb "github.com/parca-dev/parca/gen/proto/go/parca/metastore/v1alpha1" "github.com/parca-dev/parca/pkg/profile" @@ -161,10 +163,10 @@ func LabelNamesFromSamples( for labelName := range labels { resLabelName := labelName if _, ok := takenLabels[labelName]; ok { - resLabelName = "exported_" + resLabelName + resLabelName = model.ExportedLabelPrefix + resLabelName } - if _, ok := resLabels[resLabelName]; !ok { - resLabelName = "exported_" + resLabelName + if _, ok := resLabels[resLabelName]; ok { + resLabelName = model.ExportedLabelPrefix + resLabelName } resLabels[resLabelName] = struct{}{} } @@ -186,7 +188,7 @@ func LabelNamesFromSamples( } // TODO: support num label units. -func LabelsFromSample(takenLabelNames map[string]string, stringTable []string, plabels []*pprofpb.Label) (map[string]string, map[string]int64) { +func LabelsFromSample(takenLabels map[string]string, stringTable []string, plabels []*pprofpb.Label) (map[string]string, map[string]int64) { labels := map[string][]string{} labelNames := []string{} for _, label := range plabels { @@ -207,11 +209,11 @@ func LabelsFromSample(takenLabelNames map[string]string, stringTable []string, p resLabels := map[string]string{} for _, labelName := range labelNames { resLabelName := labelName - if _, ok := takenLabelNames[resLabelName]; ok { - resLabelName = "exported_" + resLabelName + if _, ok := takenLabels[resLabelName]; ok { + resLabelName = model.ExportedLabelPrefix + resLabelName } if _, ok := resLabels[resLabelName]; ok { - resLabelName = "exported_" + resLabelName + resLabelName = model.ExportedLabelPrefix + resLabelName } resLabels[resLabelName] = labels[labelName][0] } diff --git a/pkg/parcacol/normalizer_test.go b/pkg/parcacol/normalizer_test.go index 1cd0e9d04fd..7a71fd06fef 100644 --- a/pkg/parcacol/normalizer_test.go +++ b/pkg/parcacol/normalizer_test.go @@ -21,55 +21,126 @@ import ( pprofpb "github.com/parca-dev/parca/gen/proto/go/google/pprof" ) +func TestLabelNamesFromSamples(t *testing.T) { + cases := []struct { + name string + takenLabels map[string]string + stringTable []string + samples []*pprofpb.Sample + allLabels map[string]struct{} + allNumLabels map[string]struct{} + }{ + { + name: "colliding labels in descending order", + takenLabels: map[string]string{ + "instance": "127.0.0.1:6060", + }, + stringTable: []string{"", "instance", "17", "method", "GET"}, + samples: []*pprofpb.Sample{ + { + Label: []*pprofpb.Label{{ + Key: 3, + Str: 4, + }, { + Key: 1, + Str: 2, + }}, + }, + }, + allLabels: map[string]struct{}{ + "exported_instance": {}, + "method": {}, + }, + allNumLabels: map[string]struct{}{}, + }, + { + name: "colliding labels in ascending order", + takenLabels: map[string]string{ + "instance": "127.0.0.1:6060", + }, + stringTable: []string{"", "instance", "17", "method", "GET"}, + samples: []*pprofpb.Sample{ + { + Label: []*pprofpb.Label{{ + Key: 1, + Str: 2, + }, { + Key: 3, + Str: 4, + }}, + }, + }, + allLabels: map[string]struct{}{ + "exported_instance": {}, + "method": {}, + }, + allNumLabels: map[string]struct{}{}, + }, + } + + for _, c := range cases { + t.Run("", func(t *testing.T) { + allLabels := map[string]struct{}{} + allNumLabels := map[string]struct{}{} + LabelNamesFromSamples(c.takenLabels, c.stringTable, c.samples, allLabels, allNumLabels) + require.Equal(t, c.allLabels, allLabels) + require.Equal(t, c.allNumLabels, allNumLabels) + }) + } +} + func TestLabelsFromSample(t *testing.T) { cases := []struct { name string takenLabels map[string]string stringTable []string - samples []*pprofpb.Label + labels []*pprofpb.Label resultLabels map[string]string resultNumLabels map[string]int64 - }{{ - name: "descending order", - takenLabels: map[string]string{ - "foo": "bar", - }, - stringTable: []string{"", "foo", "bar", "exported_foo", "baz"}, - samples: []*pprofpb.Label{{ - Key: 1, - Str: 2, - }, { - Key: 3, - Str: 4, - }}, - resultLabels: map[string]string{ - "exported_foo": "baz", - "exported_exported_foo": "bar", - }, - resultNumLabels: map[string]int64{}, - }, { - name: "ascending order", - takenLabels: map[string]string{ - "a": "b", + }{ + { + name: "colliding labels in descending order", + takenLabels: map[string]string{ + "instance": "127.0.0.1:6060", + }, + stringTable: []string{"", "instance", "17", "method", "GET"}, + labels: []*pprofpb.Label{{ + Key: 3, + Str: 4, + }, { + Key: 1, + Str: 2, + }}, + resultLabels: map[string]string{ + "exported_instance": "17", + "method": "GET", + }, + resultNumLabels: map[string]int64{}, }, - stringTable: []string{"", "a", "bar", "exported_a", "baz"}, - samples: []*pprofpb.Label{{ - Key: 1, - Str: 2, - }, { - Key: 3, - Str: 4, - }}, - resultLabels: map[string]string{ - "exported_a": "bar", - "exported_exported_a": "baz", + { + name: "colliding labels in ascending order", + takenLabels: map[string]string{ + "instance": "127.0.0.1:6060", + }, + stringTable: []string{"", "instance", "17", "method", "GET"}, + labels: []*pprofpb.Label{{ + Key: 1, + Str: 2, + }, { + Key: 3, + Str: 4, + }}, + resultLabels: map[string]string{ + "exported_instance": "17", + "method": "GET", + }, + resultNumLabels: map[string]int64{}, }, - resultNumLabels: map[string]int64{}, - }} + } for _, c := range cases { t.Run("", func(t *testing.T) { - labels, numLabels := LabelsFromSample(c.takenLabels, c.stringTable, c.samples) + labels, numLabels := LabelsFromSample(c.takenLabels, c.stringTable, c.labels) require.Equal(t, c.resultLabels, labels) require.Equal(t, c.resultNumLabels, numLabels) }) diff --git a/pkg/parcacol/querier.go b/pkg/parcacol/querier.go index ac5040102c1..05bd661b46f 100644 --- a/pkg/parcacol/querier.go +++ b/pkg/parcacol/querier.go @@ -74,6 +74,11 @@ type Querier struct { tracer trace.Tracer } +const ( + ColumnLabelsPrefix = ColumnLabels + "." + ColumnPprofLabelsPrefix = ColumnPprofLabels + "." +) + func (q *Querier) Labels( ctx context.Context, match []string, @@ -169,7 +174,25 @@ func (q *Querier) Values( } func MatcherToBooleanExpression(matcher *labels.Matcher) (logicalplan.Expr, error) { - ref := logicalplan.Col("labels." + matcher.Name) + label := logicalplan.Col(ColumnLabelsPrefix + matcher.Name) + labelExpr, err := matcherToBinaryExpression(matcher, label) + if err != nil { + return nil, err + } + pprofLabel := logicalplan.Col(ColumnPprofLabelsPrefix + matcher.Name) + pprofLabelExpr, err := matcherToBinaryExpression(matcher, pprofLabel) + if err != nil { + return nil, err + } + + return logicalplan.Or( + logicalplan.And(pprofLabel.Eq(&logicalplan.LiteralExpr{Value: scalar.ScalarNull}), labelExpr), + logicalplan.And(label.Eq(&logicalplan.LiteralExpr{Value: scalar.ScalarNull}), pprofLabelExpr), + ), + nil +} + +func matcherToBinaryExpression(matcher *labels.Matcher, ref *logicalplan.Column) (*logicalplan.BinaryExpr, error) { switch matcher.Type { case labels.MatchEqual: if matcher.Value == "" { diff --git a/pkg/query/pprof.go b/pkg/query/pprof.go index c3bb704f822..a8b2a644a22 100644 --- a/pkg/query/pprof.go +++ b/pkg/query/pprof.go @@ -152,9 +152,15 @@ func GenerateFlatPprof(ctx context.Context, ip *parcaprofile.Profile) (*profile. s.Value = s.DiffValue } + labels := make(map[string][]string, len(s.Label)) + for k, v := range s.Label { + labels[k] = []string{v} + } + p.Sample = append(p.Sample, &profile.Sample{ Value: []int64{s.Value}, Location: locations, + Label: labels, }) }