Skip to content

Commit

Permalink
Dependency inject the memory.Allocator for rendering (#3461)
Browse files Browse the repository at this point in the history
  • Loading branch information
thorfour authored Jul 17, 2023
1 parent e4d0523 commit 4865f0e
Show file tree
Hide file tree
Showing 6 changed files with 24 additions and 5 deletions.
1 change: 1 addition & 0 deletions pkg/parca/parca.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,7 @@ func Run(ctx context.Context, logger log.Logger, reg *prometheus.Registry, flags
"stacktraces",
metastore,
),
memory.DefaultAllocator,
)

ctx, cancel := context.WithCancel(ctx)
Expand Down
3 changes: 3 additions & 0 deletions pkg/parca/parca_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,7 @@ func TestConsistency(t *testing.T) {
"stacktraces",
metastore,
),
memory.DefaultAllocator,
)

ts := timestamppb.New(timestamp.Time(1608199718549)) // time_nanos of the profile divided by 1e6
Expand Down Expand Up @@ -366,6 +367,7 @@ func TestPGOE2e(t *testing.T) {
"stacktraces",
metastore,
),
memory.DefaultAllocator,
)

res, err := api.Query(ctx, &querypb.QueryRequest{
Expand Down Expand Up @@ -464,6 +466,7 @@ func TestLabels(t *testing.T) {
"labels",
metastore,
),
memory.DefaultAllocator,
)

ts := timestamppb.New(timestamp.Time(1677488315039)) // time_nanos of the profile divided by 1e6
Expand Down
9 changes: 7 additions & 2 deletions pkg/query/columnquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"sync"
"time"

"github.com/apache/arrow/go/v13/arrow/memory"
"github.com/go-kit/log"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
Expand Down Expand Up @@ -54,20 +55,23 @@ type ColumnQueryAPI struct {
querier Querier

tableConverterPool *sync.Pool
mem memory.Allocator
}

func NewColumnQueryAPI(
logger log.Logger,
tracer trace.Tracer,
shareClient sharepb.ShareServiceClient,
querier Querier,
mem memory.Allocator,
) *ColumnQueryAPI {
return &ColumnQueryAPI{
logger: logger,
tracer: tracer,
shareClient: shareClient,
querier: querier,
tableConverterPool: NewTableConverterPool(),
mem: mem,
}
}

Expand Down Expand Up @@ -237,7 +241,7 @@ func (q *ColumnQueryAPI) renderReport(
nodeTrimThreshold float32,
filtered int64,
) (*pb.QueryResponse, error) {
return RenderReport(ctx, q.tracer, p, typ, nodeTrimThreshold, filtered, q.tableConverterPool)
return RenderReport(ctx, q.tracer, p, typ, nodeTrimThreshold, filtered, q.tableConverterPool, q.mem)
}

func RenderReport(
Expand All @@ -248,6 +252,7 @@ func RenderReport(
nodeTrimThreshold float32,
filtered int64,
pool *sync.Pool,
mem memory.Allocator,
) (*pb.QueryResponse, error) {
ctx, span := tracer.Start(ctx, "renderReport")
span.SetAttributes(attribute.String("reportType", typ.String()))
Expand Down Expand Up @@ -287,7 +292,7 @@ func RenderReport(
}, nil
case pb.QueryRequest_REPORT_TYPE_FLAMEGRAPH_ARROW:
// TODO: Make the fields to aggregate by configurable via the API.
fa, total, err := GenerateFlamegraphArrow(ctx, tracer, p, []string{FlamegraphFieldFunctionName}, nodeTrimFraction)
fa, total, err := GenerateFlamegraphArrow(ctx, mem, tracer, p, []string{FlamegraphFieldFunctionName}, nodeTrimFraction)
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to generate arrow flamegraph: %v", err.Error())
}
Expand Down
11 changes: 10 additions & 1 deletion pkg/query/columnquery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ func TestColumnQueryAPIQueryRangeEmpty(t *testing.T) {
"stacktraces",
metastore,
),
memory.DefaultAllocator,
)
_, err = api.QueryRange(ctx, &pb.QueryRangeRequest{
Query: `memory:alloc_objects:count:space:bytes{job="default"}`,
Expand Down Expand Up @@ -219,6 +220,7 @@ func TestColumnQueryAPIQueryRange(t *testing.T) {
"stacktraces",
metastore,
),
memory.DefaultAllocator,
)
res, err := api.QueryRange(ctx, &pb.QueryRangeRequest{
Query: `memory:alloc_objects:count:space:bytes{job="default"}`,
Expand Down Expand Up @@ -313,6 +315,7 @@ func TestColumnQueryAPIQuerySingle(t *testing.T) {
"stacktraces",
metastore,
),
memory.DefaultAllocator,
)
ts := timestamppb.New(timestamp.Time(p.TimeNanos / time.Millisecond.Nanoseconds()))
res, err := api.Query(ctx, &pb.QueryRequest{
Expand Down Expand Up @@ -441,6 +444,7 @@ func TestColumnQueryAPIQueryFgprof(t *testing.T) {
"stacktraces",
metastore,
),
memory.DefaultAllocator,
)

res, err := api.QueryRange(ctx, &pb.QueryRangeRequest{
Expand Down Expand Up @@ -542,6 +546,7 @@ func TestColumnQueryAPIQueryCumulative(t *testing.T) {
"stacktraces",
metastore,
),
memory.DefaultAllocator,
)

// These have been extracted from the profiles above.
Expand Down Expand Up @@ -751,6 +756,7 @@ func TestColumnQueryAPIQueryDiff(t *testing.T) {
"stacktraces",
metastore,
),
memory.DefaultAllocator,
)

res, err := api.Query(ctx, &pb.QueryRequest{
Expand Down Expand Up @@ -932,6 +938,7 @@ func TestColumnQueryAPITypes(t *testing.T) {
"stacktraces",
metastore,
),
memory.DefaultAllocator,
)
res, err := api.ProfileTypes(ctx, &pb.ProfileTypesRequest{})
require.NoError(t, err)
Expand Down Expand Up @@ -1024,6 +1031,7 @@ func TestColumnQueryAPILabelNames(t *testing.T) {
"stacktraces",
metastore,
),
memory.DefaultAllocator,
)
res, err := api.Labels(ctx, &pb.LabelsRequest{})
require.NoError(t, err)
Expand Down Expand Up @@ -1108,6 +1116,7 @@ func TestColumnQueryAPILabelValues(t *testing.T) {
"stacktraces",
metastore,
),
memory.DefaultAllocator,
)
res, err := api.Values(ctx, &pb.ValuesRequest{
LabelName: "job",
Expand Down Expand Up @@ -1136,7 +1145,7 @@ func BenchmarkQuery(b *testing.B) {
b.ResetTimer()

for i := 0; i < b.N; i++ {
_, _ = RenderReport(ctx, tracer, sp, pb.QueryRequest_REPORT_TYPE_FLAMEGRAPH_ARROW, 0, 0, NewTableConverterPool())
_, _ = RenderReport(ctx, tracer, sp, pb.QueryRequest_REPORT_TYPE_FLAMEGRAPH_ARROW, 0, 0, NewTableConverterPool(), memory.DefaultAllocator)
}
}

Expand Down
3 changes: 1 addition & 2 deletions pkg/query/flamegraph_arrow.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,7 @@ const (
FlamegraphFieldDiff = "diff"
)

func GenerateFlamegraphArrow(ctx context.Context, tracer trace.Tracer, p *profile.Profile, aggregate []string, trimFraction float32) (*queryv1alpha1.FlamegraphArrow, int64, error) {
mem := memory.NewGoAllocator()
func GenerateFlamegraphArrow(ctx context.Context, mem memory.Allocator, tracer trace.Tracer, p *profile.Profile, aggregate []string, trimFraction float32) (*queryv1alpha1.FlamegraphArrow, int64, error) {
record, cumulative, height, trimmed, err := generateFlamegraphArrowRecord(ctx, mem, tracer, p, aggregate, trimFraction)
if err != nil {
return nil, 0, err
Expand Down
2 changes: 2 additions & 0 deletions pkg/query/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ func Benchmark_Query_Merge(b *testing.B) {
"stacktraces",
metastore,
),
memory.DefaultAllocator,
)
b.ResetTimer()

Expand Down Expand Up @@ -185,6 +186,7 @@ func Benchmark_ProfileTypes(b *testing.B) {
"stacktraces",
m,
),
memory.DefaultAllocator,
)
b.ResetTimer()

Expand Down

0 comments on commit 4865f0e

Please sign in to comment.