Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[processor/lsminterval] Optimize pebble usage #270

Merged
merged 5 commits into from
Jan 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 31 additions & 8 deletions processor/lsmintervalprocessor/internal/data/expo/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,11 @@ import (
)

// Merge combines the counts of buckets a and b into a.
// Both buckets MUST be of same scale
// Both buckets MUST be of same scale.
//
// The code has been modified from the upstream code to optimize allocations
// performed while merging the histograms. This also makes the Merge operation
// mutating w.r.t. the brel bucket.
func Merge(arel, brel Buckets) {
if brel.BucketCounts().Len() == 0 {
return
Expand All @@ -45,13 +49,32 @@ func Merge(arel, brel Buckets) {

size := up - lo

counts := pcommon.NewUInt64Slice()
counts.Append(make([]uint64, size-counts.Len())...)

for i := 0; i < counts.Len(); i++ {
counts.SetAt(i, a.Abs(lo+i)+b.Abs(lo+i))
// TODO (lahsivjar): the below optimization is not able to take advantage
// of slices with greater length than what is required as the pdata model
// does not allow reslicing:
// https://github.com/open-telemetry/opentelemetry-collector/issues/12004
switch {
case a.Lower() == lo && size == a.BucketCounts().Len():
counts := a.BucketCounts()
for i := 0; i < size; i++ {
val := a.Abs(lo+i) + b.Abs(lo+i)
counts.SetAt(i, val)
}
case b.Lower() == lo && size == b.BucketCounts().Len():
counts := b.BucketCounts()
for i := 0; i < size; i++ {
val := a.Abs(lo+i) + b.Abs(lo+i)
counts.SetAt(i, val)
}
counts.MoveTo(a.BucketCounts())
default:
counts := pcommon.NewUInt64Slice()
counts.EnsureCapacity(size)
for i := 0; i < size; i++ {
val := a.Abs(lo+i) + b.Abs(lo+i)
counts.Append(val)
}
counts.MoveTo(a.BucketCounts())
}

a.SetOffset(int32(lo))
counts.MoveTo(a.BucketCounts())
}
47 changes: 42 additions & 5 deletions processor/lsmintervalprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,38 @@ import (
var _ processor.Metrics = (*Processor)(nil)
var zeroTime = time.Unix(0, 0).UTC()

// TODO (lahsivjar): Optimize pebble
const batchCommitThreshold = 16 << 20 // 16MB
const (
// pebbleMemTableSize defines the max steady state size of a memtable.
// There can be more than 1 memtable in memory at a time as it takes
// time for old memtable to flush. The memtable size also defines
// the size for large batches. A large batch is a batch which will
// take atleast half of the memtable size. Note that the Batch#Len
// is not the same as the memtable size that the batch will occupy
// as data in batches are encoded differently. In general, the
// memtable size of the batch will be higher than the length of the
// batch data.
//
// On commit, data in the large batch maybe kept by pebble and thus
// large batches will need to be reallocated. Note that large batch
// classification uses the memtable size that a batch will occupy
// rather than the length of data slice backing the batch.
pebbleMemTableSize = 64 << 20 // 64MB
Copy link
Contributor Author

@lahsivjar lahsivjar Dec 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[For reviewers] I have gone a bit high here, this would mean we would require 64*2=128MB of memory at-least (due to max number of memtables as configured via pebbleMemTableStopWritesThreshold) for memtable. This should be good for most of our use cases - also, we could expose these configs in the component configuration for tweaking in future but I haven't done that in this PR.


// pebbleMemTableStopWritesThreshold is the hard limit on the maximum
// number of memtables that could be queued before which writes are
// stopped. This value should be at least 2 or writes will stop whenever
// a MemTable is being flushed.
pebbleMemTableStopWritesThreshold = 2

// dbCommitThresholdBytes is a soft limit and the batch is committed
// to the DB as soon as it crosses this threshold. To make sure that
// the commit threshold plays well with the max retained batch size
// the threshold should be kept smaller than the sum of max retained
// batch size and encoded size of aggregated data to be committed.
// However, this requires https://github.com/cockroachdb/pebble/pull/3139.
// So, for now we are only tweaking the available options.
dbCommitThresholdBytes = 8 << 20 // 8MB
)

type Processor struct {
passthrough PassThrough
Expand Down Expand Up @@ -75,6 +105,8 @@ func newProcessor(cfg *Config, ivlDefs []intervalDef, log *zap.Logger, next cons
return merger.New(v), nil
},
},
MemTableSize: pebbleMemTableSize,
MemTableStopWritesThreshold: pebbleMemTableStopWritesThreshold,
}
writeOpts := pebble.Sync
dataDir := cfg.Directory
Expand Down Expand Up @@ -258,8 +290,7 @@ func (p *Processor) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) erro
}

if p.batch == nil {
// TODO (lahsivjar): investigate possible optimization by using NewBatchWithSize
p.batch = p.db.NewBatch()
p.batch = newBatch(p.db)
}

for _, k := range keys {
Expand All @@ -268,7 +299,7 @@ func (p *Processor) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) erro
}
}

if p.batch.Len() >= batchCommitThreshold {
if p.batch.Len() >= dbCommitThresholdBytes {
if err := p.batch.Commit(p.wOpts); err != nil {
return errors.Join(append(errs, fmt.Errorf("failed to commit a batch to db: %w", err))...)
}
Expand Down Expand Up @@ -433,3 +464,9 @@ func (p *Processor) exportForInterval(
}
return exportedDPCount, nil
}

func newBatch(db *pebble.DB) *pebble.Batch {
// TODO (lahsivjar): Optimize batch as per our needs
// Requires release of https://github.com/cockroachdb/pebble/pull/3139
return db.NewBatch()
}
Loading