Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[processor/spanmetrics] use LRU cache for metricKeyToDimensions #6503

Merged
merged 50 commits into from
Dec 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
d66b65a
use LRU cache for `metricKeyToDimensions` so that the memory usage will
chenzhihao Dec 2, 2021
6bfd657
rename configuration attr
chenzhihao Dec 6, 2021
bda780d
add Cache struct to hold evcited items
chenzhihao Dec 6, 2021
6654260
update README for the new paramter
chenzhihao Dec 6, 2021
03a2660
fix typo
chenzhihao Dec 6, 2021
20b0627
fix lint
chenzhihao Dec 6, 2021
5b0bc7f
add license
chenzhihao Dec 6, 2021
633b35b
fix lint
chenzhihao Dec 6, 2021
2a2b20e
fix the logic for GetMetricKeyToDimensionsCacheSize()
chenzhihao Dec 6, 2021
5ecabe9
fix typo
chenzhihao Dec 6, 2021
dd1a28e
update comments
chenzhihao Dec 6, 2021
bcef423
refactory for how the processor fetch cached dimensions
chenzhihao Dec 6, 2021
9c53dd5
refactory for `metric_key_to_dimensions_cache_size` configuration
chenzhihao Dec 6, 2021
881c5e6
use assert.Len instead
chenzhihao Dec 8, 2021
27755d2
remove named return
chenzhihao Dec 8, 2021
fc801a1
update comments
chenzhihao Dec 8, 2021
6cbbb94
use `ContainsOrAdd` instead
chenzhihao Dec 8, 2021
d2cec70
panic instead of swallowing error
chenzhihao Dec 8, 2021
6af84c0
remove named return
chenzhihao Dec 8, 2021
7a69dd9
make function more readble
chenzhihao Dec 8, 2021
28753f7
Apply suggestions from code review
chenzhihao Dec 8, 2021
791616c
fix panic logic
chenzhihao Dec 8, 2021
88252d2
use assert.Empty
chenzhihao Dec 8, 2021
13402d2
move Cache to a individual package
chenzhihao Dec 8, 2021
1cc5232
fix comments typo/grammer error
chenzhihao Dec 8, 2021
b072d29
remove the redundant logic for setting the default Cache size
chenzhihao Dec 9, 2021
92aa352
empty commit to rerun test
chenzhihao Dec 9, 2021
c7e1f45
Merge branch 'main' into OBC-259_LRU
chenzhihao Dec 9, 2021
d5e2e27
Merge remote-tracking branch 'origin/main' into OBC-259_LRU
chenzhihao Dec 12, 2021
a450d01
fix race condition issue
chenzhihao Dec 12, 2021
c2c7ca4
run tests in parallel
chenzhihao Dec 12, 2021
7835082
code formatting
chenzhihao Dec 13, 2021
ffed6db
rename the configuration naming
chenzhihao Dec 13, 2021
30a35dc
format code; getDimensionsByMetricKey returns error instead of bool
chenzhihao Dec 13, 2021
fcd3c71
stop using reflect for test
chenzhihao Dec 13, 2021
733aff3
add the change to the changelog
chenzhihao Dec 13, 2021
84c5d20
fix mutex copy issue by using pointer
chenzhihao Dec 13, 2021
fa0305b
check if there is any error during initialization
chenzhihao Dec 13, 2021
7167a96
rename rwMutex
chenzhihao Dec 13, 2021
d923c39
empty commit to rerun test
chenzhihao Dec 13, 2021
e6018b3
move cache package to internal
chenzhihao Dec 14, 2021
0f8b8c8
empty commit to rerun test
chenzhihao Dec 14, 2021
52f7457
Merge branch 'main' into OBC-259_LRU
chenzhihao Dec 14, 2021
f157c39
add purge method
chenzhihao Dec 14, 2021
abdf704
gofmt
chenzhihao Dec 15, 2021
08e95a6
Update processor/spanmetricsprocessor/processor.go
chenzhihao Dec 15, 2021
bf868ff
Update processor/spanmetricsprocessor/processor.go
chenzhihao Dec 15, 2021
ad9874c
Update processor/spanmetricsprocessor/internal/cache/cache.go
chenzhihao Dec 15, 2021
0e49ac5
Update CHANGELOG.md
chenzhihao Dec 15, 2021
393a358
propagate errors
chenzhihao Dec 15, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

- `lokiexporter`: add complete log record to body (#6619)

- `spanmetricproccessor`: use an LRU cache for the cached Dimensions key-value pairs (#2179)

## v0.41.0

## 🛑 Breaking changes 🛑
Expand Down
5 changes: 4 additions & 1 deletion processor/spanmetricsprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ The following settings can be optionally configured:

If the `name`d attribute is missing in the span, the optional provided `default` is used.

If no `default` is provided, this dimension will be **omitted** from the metric.
If no `default` is provided, this dimension will be **omitted** from the metric.
- `dimensions_cache_size`: the max items number of `metric_key_to_dimensions_cache`. If not provided, will
use default value size `1000`.
- `aggregation_temporality`: Defines the aggregation temporality of the generated metrics.
One of either `AGGREGATION_TEMPORALITY_CUMULATIVE` or `AGGREGATION_TEMPORALITY_DELTA`.
- Default: `AGGREGATION_TEMPORALITY_CUMULATIVE`
Expand Down Expand Up @@ -93,6 +95,7 @@ processors:
- name: http.method
default: GET
- name: http.status_code
dimensions_cache_size: 1000
aggregation_temporality: "AGGREGATION_TEMPORALITY_DELTA"

exporters:
Expand Down
5 changes: 5 additions & 0 deletions processor/spanmetricsprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ type Config struct {
// https://github.com/open-telemetry/opentelemetry-collector/blob/main/model/semconv/opentelemetry.go.
Dimensions []Dimension `mapstructure:"dimensions"`

// DimensionsCacheSize defines the size of cache for storing Dimensions, which helps to avoid cache memory growing
// indefinitely over the lifetime of the collector.
// Optional. See defaultDimensionsCacheSize in processor.go for the default value.
DimensionsCacheSize int `mapstructure:"dimensions_cache_size"`

AggregationTemporality string `mapstructure:"aggregation_temporality"`
}

Expand Down
17 changes: 15 additions & 2 deletions processor/spanmetricsprocessor/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,21 @@ func TestLoadConfig(t *testing.T) {
wantMetricsExporter string
wantLatencyHistogramBuckets []time.Duration
wantDimensions []Dimension
wantDimensionsCacheSize int
wantAggregationTemporality string
}{
{configFile: "config-2-pipelines.yaml", wantMetricsExporter: "prometheus", wantAggregationTemporality: cumulative},
{configFile: "config-3-pipelines.yaml", wantMetricsExporter: "otlp/spanmetrics", wantAggregationTemporality: cumulative},
{
configFile: "config-2-pipelines.yaml",
wantMetricsExporter: "prometheus",
wantAggregationTemporality: cumulative,
wantDimensionsCacheSize: 500,
},
{
configFile: "config-3-pipelines.yaml",
wantMetricsExporter: "otlp/spanmetrics",
wantAggregationTemporality: cumulative,
wantDimensionsCacheSize: defaultDimensionsCacheSize,
},
{
configFile: "config-full.yaml",
wantMetricsExporter: "otlp/spanmetrics",
Expand All @@ -61,6 +72,7 @@ func TestLoadConfig(t *testing.T) {
{"http.method", &defaultMethod},
{"http.status_code", nil},
},
wantDimensionsCacheSize: 1500,
wantAggregationTemporality: delta,
},
}
Expand Down Expand Up @@ -92,6 +104,7 @@ func TestLoadConfig(t *testing.T) {
MetricsExporter: tc.wantMetricsExporter,
LatencyHistogramBuckets: tc.wantLatencyHistogramBuckets,
Dimensions: tc.wantDimensions,
DimensionsCacheSize: tc.wantDimensionsCacheSize,
AggregationTemporality: tc.wantAggregationTemporality,
},
cfg.Processors[config.NewComponentID(typeStr)],
Expand Down
1 change: 1 addition & 0 deletions processor/spanmetricsprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ func createDefaultConfig() config.Processor {
return &Config{
ProcessorSettings: config.NewProcessorSettings(config.NewComponentID(typeStr)),
AggregationTemporality: "AGGREGATION_TEMPORALITY_CUMULATIVE",
DimensionsCacheSize: defaultDimensionsCacheSize,
}
}

Expand Down
1 change: 1 addition & 0 deletions processor/spanmetricsprocessor/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/processor/spanm
go 1.17

require (
github.com/hashicorp/golang-lru v0.5.4
github.com/open-telemetry/opentelemetry-collector-contrib/exporter/jaegerexporter v0.41.0
github.com/open-telemetry/opentelemetry-collector-contrib/exporter/prometheusexporter v0.41.0
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/jaegerreceiver v0.41.0
Expand Down
79 changes: 79 additions & 0 deletions processor/spanmetricsprocessor/internal/cache/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cache // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/spanmetricsprocessor/internal/cache"

import (
"sync"

lru "github.com/hashicorp/golang-lru"
)

// Cache consists of an LRU cache and the evicted items from the LRU cache.
// This data structure makes sure all the cached items can be retrieved either from the LRU cache or the evictedItems
// map. In spanmetricsprocessor's use case, we need to hold all the items during the current processing step for
// building the metrics. The evicted items can/should be safely removed once the metrics are built from the current
// batch of spans.
type Cache struct {
*lru.Cache
rw *sync.RWMutex
evictedItems map[interface{}]interface{}
}

// NewCache creates a Cache.
func NewCache(size int) (*Cache, error) {
evictedItems := make(map[interface{}]interface{})
rw := new(sync.RWMutex)
lruCache, err := lru.NewWithEvict(size, func(key interface{}, value interface{}) {
rw.Lock()
evictedItems[key] = value
rw.Unlock()
})
if err != nil {
return nil, err
}

return &Cache{
Cache: lruCache,
evictedItems: evictedItems,
rw: rw,
}, nil
}

// RemoveEvictedItems cleans all the evicted items.
func (c *Cache) RemoveEvictedItems() {
c.rw.Lock()
// we need to keep the original pointer to evictedItems map as it is used in the closure of lru.NewWithEvict
for k := range c.evictedItems {
delete(c.evictedItems, k)
}
c.rw.Unlock()
}

// Get retrieves an item from the LRU cache or evicted items.
func (c *Cache) Get(key interface{}) (interface{}, bool) {
if val, ok := c.Cache.Get(key); ok {
return val, ok
}
c.rw.RLock()
val, ok := c.evictedItems[key]
c.rw.RUnlock()
return val, ok
}

// Purge removes all the items from the LRU cache and evicted items.
func (c *Cache) Purge() {
c.Cache.Purge()
c.RemoveEvictedItems()
}
212 changes: 212 additions & 0 deletions processor/spanmetricsprocessor/internal/cache/cache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cache

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestNewCache(t *testing.T) {
type args struct {
size int
}
tests := []struct {
name string
args args
wantErr bool
}{
{
name: "create a new Cache with length 10",
args: args{
size: 10,
},
wantErr: false,
},
{
name: "create a new Cache with length -1",
args: args{
size: -1,
},
wantErr: true,
},
{
name: "create a new Cache with length 0",
args: args{
size: 0,
},
wantErr: true,
},
}
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
_, err := NewCache(tt.args.size)
if tt.wantErr {
assert.Error(t, err)
return
}
assert.NoError(t, err)
})
}
}

func TestCache_Get(t *testing.T) {
tests := []struct {
name string
lruCache func() *Cache
evictedItems map[interface{}]interface{}
key interface{}
wantValue interface{}
wantOk bool
}{
{
name: "if key is not found in LRUCache, will get key from evictedItems",
lruCache: func() *Cache {
cache, _ := NewCache(1)
cache.evictedItems["key"] = "val"
return cache
},
key: "key",
wantValue: "val",
wantOk: true,
},
{
name: "if key is found in LRUCache, return the found item",
lruCache: func() *Cache {
cache, _ := NewCache(1)
cache.Add("key", "val_from_LRU")
cache.evictedItems["key"] = "val_from_evicted_items"
return cache
},
key: "key",
wantValue: "val_from_LRU",
wantOk: true,
},
{
name: "if key is not found either in LRUCache or evicted items, return nothing",
lruCache: func() *Cache {
cache, _ := NewCache(1)
return cache
},
key: "key",
wantValue: nil,
wantOk: false,
},
}
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
c := tt.lruCache()
gotValue, gotOk := c.Get(tt.key)
if !assert.Equal(t, gotValue, tt.wantValue) {
t.Errorf("Get() gotValue = %v, want %v", gotValue, tt.wantValue)
}
if gotOk != tt.wantOk {
t.Errorf("Get() gotOk = %v, want %v", gotOk, tt.wantOk)
}
})
}
}

func TestCache_RemoveEvictedItems(t *testing.T) {
tests := []struct {
name string
lruCache func() (*Cache, error)
}{
{
name: "no panic when there is no evicted item to remove",
lruCache: func() (*Cache, error) {
return NewCache(1)
},
},
{
name: "evicted items should be removed",
lruCache: func() (*Cache, error) {
cache, err := NewCache(1)
if err != nil {
return nil, err
}
cache.evictedItems["key0"] = "val0"
cache.evictedItems["key1"] = "val1"
return cache, nil
},
},
}
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
cache, err := tt.lruCache()
assert.NoError(t, err)
cache.RemoveEvictedItems()
assert.Empty(t, cache.evictedItems)
})
}
}

func TestCache_PurgeItems(t *testing.T) {
tests := []struct {
name string
lruCache func() (*Cache, error)
}{
{
name: "no panic when there is no item to remove",
lruCache: func() (*Cache, error) {
return NewCache(1)
},
},
{
name: "remove items from the lru cache",
lruCache: func() (*Cache, error) {
cache, err := NewCache(1)
if err != nil {
return nil, err
}
cache.evictedItems["key0"] = "val0"
cache.evictedItems["key1"] = "val1"
return cache, nil
},
},
{
name: "remove all the items from lru cache and the evicted items",
lruCache: func() (*Cache, error) {
cache, err := NewCache(10)
if err != nil {
return nil, err
}
cache.Add("key", "val")
cache.Add("key2", "val2")
cache.evictedItems["key0"] = "val0"
cache.evictedItems["key1"] = "val1"
return cache, nil
},
},
}
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
cache, err := tt.lruCache()
assert.NoError(t, err)
cache.Purge()
assert.Zero(t, cache.Len())
assert.Empty(t, cache.evictedItems)
})
}
}
Loading