Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: disable stack trace range chunking #3583

Merged
merged 1 commit into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions pkg/experiment/ingester/memdb/head.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,6 @@ func NewHead(metrics *HeadMetrics) *Head {
metrics: metrics,
symbols: symdb.NewPartitionWriter(0, &symdb.Config{
Version: symdb.FormatV3,
Stacktraces: symdb.StacktracesConfig{
MaxNodesPerChunk: 4 << 20,
},
}),
totalSamples: atomic.NewUint64(0),
minTimeNanos: math.MaxInt64,
Expand Down
5 changes: 1 addition & 4 deletions pkg/phlaredb/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -750,10 +750,7 @@ func newSymbolsCompactor(path string, version symdb.FormatVersion) *symbolsCompa
version: symdb.FormatV2,
w: symdb.NewSymDB(symdb.DefaultConfig().
WithVersion(symdb.FormatV2).
WithDirectory(dst).
WithParquetConfig(symdb.ParquetConfig{
MaxBufferRowCount: defaultParquetConfig.MaxBufferRowCount,
})),
WithDirectory(dst)),
dst: dst,
rewriters: make(map[BlockReader]*symdb.Rewriter),
}
Expand Down
42 changes: 16 additions & 26 deletions pkg/phlaredb/symdb/block_writer_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"hash/crc32"
"io"
"math"
"os"
"path/filepath"

Expand All @@ -15,7 +16,6 @@ import (
"github.com/grafana/pyroscope/pkg/phlaredb/block"
schemav1 "github.com/grafana/pyroscope/pkg/phlaredb/schemas/v1"
"github.com/grafana/pyroscope/pkg/util/build"
"github.com/grafana/pyroscope/pkg/util/math"
)

type writerV2 struct {
Expand Down Expand Up @@ -138,30 +138,20 @@ func (w *writerV2) Flush() (err error) {
}

func (w *writerV2) writeStacktraces(partition *PartitionWriter) (err error) {
for ci, c := range partition.stacktraces.chunks {
stacks := c.stacks
if stacks == 0 {
stacks = uint32(len(partition.stacktraces.hashToIdx))
}
h := StacktraceBlockHeader{
Offset: w.stacktraces.w.offset,
Size: 0, // Set later.
Partition: partition.header.Partition,
BlockIndex: uint16(ci),
Encoding: StacktraceEncodingGroupVarint,
Stacktraces: stacks,
StacktraceNodes: c.tree.len(),
StacktraceMaxDepth: 0, // TODO
StacktraceMaxNodes: c.partition.maxNodesPerChunk,
CRC: 0, // Set later.
}
crc := crc32.New(castagnoli)
if h.Size, err = c.WriteTo(io.MultiWriter(crc, w.stacktraces)); err != nil {
return fmt.Errorf("writing stacktrace chunk data: %w", err)
}
h.CRC = crc.Sum32()
partition.header.Stacktraces = append(partition.header.Stacktraces, h)
h := StacktraceBlockHeader{
Offset: w.stacktraces.w.offset,
Partition: partition.header.Partition,
Encoding: StacktraceEncodingGroupVarint,
Stacktraces: uint32(len(partition.stacktraces.hashToIdx)),
StacktraceNodes: partition.stacktraces.tree.len(),
StacktraceMaxNodes: math.MaxUint32,
}
crc := crc32.New(castagnoli)
if h.Size, err = partition.stacktraces.WriteTo(io.MultiWriter(crc, w.stacktraces)); err != nil {
return fmt.Errorf("writing stacktrace chunk data: %w", err)
}
h.CRC = crc.Sum32()
partition.header.Stacktraces = append(partition.header.Stacktraces, h)
return nil
}

Expand Down Expand Up @@ -220,7 +210,7 @@ func (s *parquetWriter[M, P]) init(dir string, c ParquetConfig) (err error) {
return err
}
s.rowsBatch = make([]parquet.Row, 0, 128)
s.buffer = parquet.NewBuffer(s.persister.Schema(), parquet.ColumnBufferCapacity(s.config.MaxBufferRowCount))
s.buffer = parquet.NewBuffer(s.persister.Schema())
s.writer = parquet.NewGenericWriter[P](s.file, s.persister.Schema(),
parquet.CreatedBy("github.com/grafana/pyroscope/", build.Version, build.Revision),
parquet.PageBufferSize(3*1024*1024),
Expand Down Expand Up @@ -265,7 +255,7 @@ func (s *parquetWriter[M, P]) writeRows(values []M) (r RowRangeReference, err er
}

func (s *parquetWriter[M, P]) fillBatch(values []M) int {
m := math.Min(len(values), cap(s.rowsBatch))
m := min(len(values), cap(s.rowsBatch))
s.rowsBatch = s.rowsBatch[:m]
for i := 0; i < m; i++ {
row := s.rowsBatch[i][:0]
Expand Down
36 changes: 16 additions & 20 deletions pkg/phlaredb/symdb/block_writer_v3.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"hash/crc32"
"io"
"math"
"os"
"path/filepath"

Expand Down Expand Up @@ -88,27 +89,22 @@ func writePartitionV3(w *writerOffset, e *encodersV3, p *PartitionWriter) (err e
if p.header.V3.Locations, err = writeSymbolsBlock(w, p.locations.slice, e.locationsEncoder); err != nil {
return err
}
for ci, c := range p.stacktraces.chunks {
stacks := c.stacks
if stacks == 0 {
stacks = uint32(len(p.stacktraces.hashToIdx))
}
h := StacktraceBlockHeader{
Offset: w.offset,
Partition: p.header.Partition,
BlockIndex: uint16(ci),
Encoding: StacktraceEncodingGroupVarint,
Stacktraces: stacks,
StacktraceNodes: c.tree.len(),
StacktraceMaxNodes: c.partition.maxNodesPerChunk,
}
crc := crc32.New(castagnoli)
if h.Size, err = c.WriteTo(io.MultiWriter(crc, w)); err != nil {
return fmt.Errorf("writing stacktrace chunk data: %w", err)
}
h.CRC = crc.Sum32()
p.header.Stacktraces = append(p.header.Stacktraces, h)

h := StacktraceBlockHeader{
Offset: w.offset,
Partition: p.header.Partition,
Encoding: StacktraceEncodingGroupVarint,
Stacktraces: uint32(len(p.stacktraces.hashToIdx)),
StacktraceNodes: p.stacktraces.tree.len(),
StacktraceMaxNodes: math.MaxUint32,
}
crc := crc32.New(castagnoli)
if h.Size, err = p.stacktraces.WriteTo(io.MultiWriter(crc, w)); err != nil {
return fmt.Errorf("writing stacktrace chunk data: %w", err)
}
h.CRC = crc.Sum32()
p.header.Stacktraces = append(p.header.Stacktraces, h)

return nil
}

Expand Down
156 changes: 24 additions & 132 deletions pkg/phlaredb/symdb/partition_memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,13 @@ import (
"io"
"sync"

"github.com/grafana/pyroscope/pkg/iter"
schemav1 "github.com/grafana/pyroscope/pkg/phlaredb/schemas/v1"
)

type PartitionWriter struct {
header PartitionHeader

stacktraces *stacktracesPartition
stacktraces *stacktraces
strings deduplicatingSlice[string, string, *stringsHelper]
mappings deduplicatingSlice[schemav1.InMemoryMapping, mappingsKey, *mappingsHelper]
functions deduplicatingSlice[schemav1.InMemoryFunction, functionsKey, *functionsHelper]
Expand All @@ -32,99 +31,36 @@ func (p *PartitionWriter) ResolveStacktraceLocations(_ context.Context, dst Stac

func (p *PartitionWriter) LookupLocations(dst []uint64, stacktraceID uint32) []uint64 {
dst = dst[:0]
if len(p.stacktraces.chunks) == 0 {
if stacktraceID == 0 {
return dst
}
chunkID := stacktraceID / p.stacktraces.maxNodesPerChunk
localSID := stacktraceID % p.stacktraces.maxNodesPerChunk
if localSID == 0 || int(chunkID) > len(p.stacktraces.chunks) {
return dst
}
return p.stacktraces.chunks[chunkID].tree.resolveUint64(dst, localSID)
}

type stacktracesPartition struct {
maxNodesPerChunk uint32

m sync.RWMutex
hashToIdx map[uint64]uint32
chunks []*stacktraceChunk
return p.stacktraces.tree.resolveUint64(dst, stacktraceID)
}

func (p *PartitionWriter) SplitStacktraceIDRanges(appender *SampleAppender) iter.Iterator[*StacktraceIDRange] {
if len(p.stacktraces.chunks) == 0 {
return iter.NewEmptyIterator[*StacktraceIDRange]()
}
var n int
samples := appender.Samples()
ranges := SplitStacktraces(samples.StacktraceIDs, p.stacktraces.maxNodesPerChunk)
for _, sr := range ranges {
c := p.stacktraces.chunks[sr.chunk]
sr.ParentPointerTree = c.tree
sr.Samples = samples.Range(n, n+len(sr.IDs))
n += len(sr.IDs)
func newStacktraces() *stacktraces {
p := &stacktraces{
hashToIdx: make(map[uint64]uint32),
tree: newStacktraceTree(defaultStacktraceTreeSize),
}
return iter.NewSliceIterator(ranges)
return p
}

func newStacktracesPartition(maxNodesPerChunk uint32) *stacktracesPartition {
p := &stacktracesPartition{
maxNodesPerChunk: maxNodesPerChunk,
hashToIdx: make(map[uint64]uint32, defaultStacktraceTreeSize/2),
}
p.chunks = append(p.chunks, &stacktraceChunk{
tree: newStacktraceTree(defaultStacktraceTreeSize),
partition: p,
})
return p
type stacktraces struct {
m sync.RWMutex
hashToIdx map[uint64]uint32
tree *stacktraceTree
stacks uint32
}

func (p *stacktracesPartition) size() uint64 {
func (p *stacktraces) size() uint64 {
p.m.RLock()
// TODO: map footprint isn't accounted
v := 0
for _, c := range p.chunks {
v += stacktraceTreeNodeSize * cap(c.tree.nodes)
}
v := stacktraceTreeNodeSize * cap(p.tree.nodes)
p.m.RUnlock()
return uint64(v)
}

// stacktraceChunkForInsert returns a chunk for insertion:
// if the existing one has capacity, or a new one, if the former is full.
// Must be called with the stracktraces mutex write lock held.
func (p *stacktracesPartition) stacktraceChunkForInsert(x int) *stacktraceChunk {
c := p.currentStacktraceChunk()
if n := c.tree.len() + uint32(x); p.maxNodesPerChunk > 0 && n >= p.maxNodesPerChunk {
// Calculate number of stacks in the chunk.
s := uint32(len(p.hashToIdx))
c.stacks = s - c.stacks
c = &stacktraceChunk{
partition: p,
tree: newStacktraceTree(defaultStacktraceTreeSize),
stid: c.stid + p.maxNodesPerChunk,
stacks: s,
}
p.chunks = append(p.chunks, c)
}
return c
}

// stacktraceChunkForRead returns a chunk for reads.
// Must be called with the stracktraces mutex read lock held.
func (p *stacktracesPartition) stacktraceChunkForRead(i int) (*stacktraceChunk, bool) {
if i < len(p.chunks) {
return p.chunks[i], true
}
return nil, false
}

func (p *stacktracesPartition) currentStacktraceChunk() *stacktraceChunk {
// Assuming there is at least one chunk.
return p.chunks[len(p.chunks)-1]
}

func (p *stacktracesPartition) append(dst []uint32, s []*schemav1.Stacktrace) {
func (p *stacktraces) append(dst []uint32, s []*schemav1.Stacktrace) {
if len(s) == 0 {
return
}
Expand Down Expand Up @@ -160,29 +96,16 @@ func (p *stacktracesPartition) append(dst []uint32, s []*schemav1.Stacktrace) {

p.m.Lock()
defer p.m.Unlock()
chunk := p.currentStacktraceChunk()

m := int(p.maxNodesPerChunk)
t, j := chunk.tree, chunk.stid
for i, v := range dst[:len(s)] {
if v != 0 {
// Already resolved. ID 0 is reserved
// as it is the tree root.
continue
}

x := s[i].LocationIDs
if m > 0 && len(t.nodes)+len(x) >= m {
// If we're close to the max nodes limit and can
// potentially exceed it, we take the next chunk,
// even if there are some space.
chunk = p.stacktraceChunkForInsert(len(x))
t, j = chunk.tree, chunk.stid
}

// Tree insertion is idempotent,
// we don't need to check the map.
id = t.insert(x) + j
id = p.tree.insert(x)
h := hashLocations(x)
p.hashToIdx[h] = id
dst[i] = id
Expand All @@ -205,30 +128,9 @@ func (p *stacktraceLocationsPool) put(x []int32) {
stacktraceLocations.Put(x)
}

func (p *stacktracesPartition) resolve(dst StacktraceInserter, stacktraces []uint32) (err error) {
for _, sr := range SplitStacktraces(stacktraces, p.maxNodesPerChunk) {
if err = p.ResolveChunk(dst, sr); err != nil {
return err
}
}
return nil
}

// NOTE(kolesnikovae):
// Caller is able to split a range of stacktrace IDs into chunks
// with SplitStacktraces, and then resolve them concurrently:
// StacktraceInserter could be implemented as a dense set, map,
// slice, or an n-ary tree: the stacktraceTree should be one of
// the options, the package provides.

func (p *stacktracesPartition) ResolveChunk(dst StacktraceInserter, sr *StacktraceIDRange) error {
func (p *stacktraces) resolve(dst StacktraceInserter, stacktraces []uint32) (err error) {
p.m.RLock()
c, found := p.stacktraceChunkForRead(int(sr.chunk))
if !found {
p.m.RUnlock()
return ErrInvalidStacktraceRange
}
t := stacktraceTree{nodes: c.tree.nodes}
t := stacktraceTree{nodes: p.tree.nodes}
// tree.resolve is thread safe: only the parent node index (p)
// and the reference to location (r) node fields are accessed,
// which are never modified after insertion.
Expand All @@ -239,25 +141,16 @@ func (p *stacktracesPartition) ResolveChunk(dst StacktraceInserter, sr *Stacktra
// the call.
p.m.RUnlock()
s := stacktraceLocations.get()
// Restore the original stacktrace ID.
off := sr.Offset()
for _, sid := range sr.IDs {
for _, sid := range stacktraces {
s = t.resolve(s, sid)
dst.InsertStacktrace(off+sid, s)
dst.InsertStacktrace(sid, s)
}
stacktraceLocations.put(s)
return nil
}

type stacktraceChunk struct {
partition *stacktracesPartition
tree *stacktraceTree
stid uint32 // Initial stack trace ID.
stacks uint32 //
}

func (s *stacktraceChunk) WriteTo(dst io.Writer) (int64, error) {
return s.tree.WriteTo(dst)
func (p *stacktraces) WriteTo(dst io.Writer) (int64, error) {
return p.tree.WriteTo(dst)
}

func (p *PartitionWriter) AppendLocations(dst []uint32, locations []schemav1.InMemoryLocation) {
Expand Down Expand Up @@ -288,8 +181,7 @@ func (p *PartitionWriter) Symbols() *Symbols {

func (p *PartitionWriter) WriteStats(s *PartitionStats) {
p.stacktraces.m.RLock()
c := p.stacktraces.currentStacktraceChunk()
s.MaxStacktraceID = int(c.stid + c.tree.len())
s.MaxStacktraceID = int(p.stacktraces.tree.len())
s.StacktracesTotal = len(p.stacktraces.hashToIdx)
p.stacktraces.m.RUnlock()

Expand Down
Loading
Loading