Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change servicegraphprocessor to avoid allocating hex strings for map key #16248

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions processor/servicegraphprocessor/internal/store/edge.go
Original file line number Diff line number Diff line change
@@ -30,7 +30,7 @@ const (

// Edge is an Edge between two nodes in the graph
type Edge struct {
key string
key Key

TraceID pcommon.TraceID
ConnectionType ConnectionType
@@ -48,7 +48,7 @@ type Edge struct {
expiration time.Time
}

func newEdge(key string, ttl time.Duration) *Edge {
func newEdge(key Key, ttl time.Duration) *Edge {
return &Edge{
key: key,
Dimensions: make(map[string]string),
25 changes: 0 additions & 25 deletions processor/servicegraphprocessor/internal/store/interface.go

This file was deleted.

31 changes: 21 additions & 10 deletions processor/servicegraphprocessor/internal/store/store.go
Original file line number Diff line number Diff line change
@@ -19,18 +19,29 @@ import (
"errors"
"sync"
"time"

"go.opentelemetry.io/collector/pdata/pcommon"
)

var (
ErrTooManyItems = errors.New("too many items")
)

var _ Store = (*store)(nil)
type Callback func(e *Edge)

type Key struct {
tid pcommon.TraceID
sid pcommon.SpanID
}

func NewKey(tid pcommon.TraceID, sid pcommon.SpanID) Key {
return Key{tid: tid, sid: sid}
}

type store struct {
type Store struct {
l *list.List
mtx sync.Mutex
m map[string]*list.Element
m map[Key]*list.Element

onComplete Callback
onExpire Callback
@@ -42,10 +53,10 @@ type store struct {
// NewStore creates a Store to build service graphs. The store caches edges, each representing a
// request between two services. Once an edge is complete its metrics can be collected. Edges that
// have not found their pair are deleted after ttl time.
func NewStore(ttl time.Duration, maxItems int, onComplete, onExpire Callback) Store {
s := &store{
func NewStore(ttl time.Duration, maxItems int, onComplete, onExpire Callback) *Store {
s := &Store{
l: list.New(),
m: make(map[string]*list.Element),
m: make(map[Key]*list.Element),

onComplete: onComplete,
onExpire: onExpire,
@@ -58,14 +69,14 @@ func NewStore(ttl time.Duration, maxItems int, onComplete, onExpire Callback) St
}

// len is only used for testing.
func (s *store) len() int {
func (s *Store) len() int {
return s.l.Len()
}

// UpsertEdge fetches an Edge from the store and updates it using the given callback. If the Edge
// doesn't exist yet, it creates a new one with the default TTL.
// If the Edge is complete after applying the callback, it's completed and removed.
func (s *store) UpsertEdge(key string, update Callback) (isNew bool, err error) {
func (s *Store) UpsertEdge(key Key, update Callback) (isNew bool, err error) {
s.mtx.Lock()
defer s.mtx.Unlock()

@@ -103,7 +114,7 @@ func (s *store) UpsertEdge(key string, update Callback) (isNew bool, err error)
}

// Expire evicts all expired items in the store.
func (s *store) Expire() {
func (s *Store) Expire() {
s.mtx.Lock()
defer s.mtx.Unlock()

@@ -116,7 +127,7 @@ func (s *store) Expire() {
// Returns true if the head was evicted.
//
// Must be called holding lock.
func (s *store) tryEvictHead() bool {
func (s *Store) tryEvictHead() bool {
head := s.l.Front()
if head == nil {
return false // list is empty
47 changes: 20 additions & 27 deletions processor/servicegraphprocessor/internal/store/store_test.go
Original file line number Diff line number Diff line change
@@ -15,30 +15,29 @@
package store // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/servicegraphprocessor/internal/store"

import (
"fmt"
"encoding/hex"
"math/rand"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/pdata/pcommon"
)

const clientService = "client"

func TestStoreUpsertEdge(t *testing.T) {
const keyStr = "key"
key := NewKey(pcommon.TraceID([16]byte{1, 2, 3}), pcommon.SpanID([8]byte{1, 2, 3}))

var onCompletedCount int
var onExpireCount int

storeInterface := NewStore(time.Hour, 1, countingCallback(&onCompletedCount), countingCallback(&onExpireCount))

s := storeInterface.(*store)
s := NewStore(time.Hour, 1, countingCallback(&onCompletedCount), countingCallback(&onExpireCount))
assert.Equal(t, 0, s.len())

// Insert first half of an edge
isNew, err := s.UpsertEdge(keyStr, func(e *Edge) {
isNew, err := s.UpsertEdge(key, func(e *Edge) {
e.ClientService = clientService
})
require.NoError(t, err)
@@ -51,7 +50,7 @@ func TestStoreUpsertEdge(t *testing.T) {
assert.Equal(t, 0, onExpireCount)

// Insert the second half of an edge
isNew, err = s.UpsertEdge(keyStr, func(e *Edge) {
isNew, err = s.UpsertEdge(key, func(e *Edge) {
assert.Equal(t, clientService, e.ClientService)
e.ServerService = "server"
})
@@ -64,7 +63,7 @@ func TestStoreUpsertEdge(t *testing.T) {
assert.Equal(t, 0, onExpireCount)

// Insert an edge that will immediately expire
isNew, err = s.UpsertEdge(keyStr, func(e *Edge) {
isNew, err = s.UpsertEdge(key, func(e *Edge) {
e.ClientService = clientService
e.expiration = time.UnixMicro(0)
})
@@ -81,27 +80,27 @@ func TestStoreUpsertEdge(t *testing.T) {
}

func TestStoreUpsertEdge_errTooManyItems(t *testing.T) {
key1 := NewKey(pcommon.TraceID([16]byte{1, 2, 3}), pcommon.SpanID([8]byte{1, 2, 3}))
key2 := NewKey(pcommon.TraceID([16]byte{4, 5, 6}), pcommon.SpanID([8]byte{1, 2, 3}))
var onCallbackCounter int

storeInterface := NewStore(time.Hour, 1, countingCallback(&onCallbackCounter), countingCallback(&onCallbackCounter))

s := storeInterface.(*store)
s := NewStore(time.Hour, 1, countingCallback(&onCallbackCounter), countingCallback(&onCallbackCounter))
assert.Equal(t, 0, s.len())

isNew, err := s.UpsertEdge("key-1", func(e *Edge) {
isNew, err := s.UpsertEdge(key1, func(e *Edge) {
e.ClientService = clientService
})
require.NoError(t, err)
require.Equal(t, true, isNew)
assert.Equal(t, 1, s.len())

_, err = s.UpsertEdge("key-2", func(e *Edge) {
_, err = s.UpsertEdge(key2, func(e *Edge) {
e.ClientService = clientService
})
require.ErrorIs(t, err, ErrTooManyItems)
assert.Equal(t, 1, s.len())

isNew, err = s.UpsertEdge("key-1", func(e *Edge) {
isNew, err = s.UpsertEdge(key1, func(e *Edge) {
e.ClientService = clientService
})
require.NoError(t, err)
@@ -114,9 +113,9 @@ func TestStoreUpsertEdge_errTooManyItems(t *testing.T) {
func TestStoreExpire(t *testing.T) {
const testSize = 100

keys := map[string]bool{}
keys := map[Key]struct{}{}
for i := 0; i < testSize; i++ {
keys[fmt.Sprintf("key-%d", i)] = true
keys[NewKey(pcommon.TraceID([16]byte{byte(i)}), pcommon.SpanID([8]byte{1, 2, 3}))] = struct{}{}
}

var onCompletedCount int
@@ -127,8 +126,7 @@ func TestStoreExpire(t *testing.T) {
assert.Contains(t, keys, e.key)
}
// New edges are immediately expired
storeInterface := NewStore(-time.Second, testSize, onComplete, countingCallback(&onExpireCount))
s := storeInterface.(*store)
s := NewStore(-time.Second, testSize, onComplete, countingCallback(&onExpireCount))

for key := range keys {
isNew, err := s.UpsertEdge(key, noopCallback)
@@ -142,7 +140,7 @@ func TestStoreExpire(t *testing.T) {
assert.Equal(t, testSize, onExpireCount)
}

func TestStore_concurrency(t *testing.T) {
func TestStoreConcurrency(t *testing.T) {
s := NewStore(10*time.Millisecond, 100000, noopCallback, noopCallback)

end := make(chan struct{})
@@ -158,16 +156,11 @@ func TestStore_concurrency(t *testing.T) {
}
}

var letters = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")

go accessor(func() {
key := make([]rune, 6)
for i := range key {
key[i] = letters[rand.Intn(len(letters))]
}
key := NewKey(pcommon.TraceID([16]byte{byte(rand.Intn(32))}), pcommon.SpanID([8]byte{1, 2, 3}))

_, err := s.UpsertEdge(string(key), func(e *Edge) {
e.ClientService = string(key)
_, err := s.UpsertEdge(key, func(e *Edge) {
e.ClientService = hex.EncodeToString(key.tid[:])
})
assert.NoError(t, err)
})
14 changes: 3 additions & 11 deletions processor/servicegraphprocessor/processor.go
Original file line number Diff line number Diff line change
@@ -58,7 +58,7 @@ type processor struct {
nextConsumer consumer.Traces
metricsExporter consumer.Metrics

store store.Store
store *store.Store

startTime time.Time

@@ -196,7 +196,7 @@ func (p *processor) aggregateMetrics(ctx context.Context, td ptrace.Traces) (err
fallthrough
case ptrace.SpanKindClient:
traceID := span.TraceID()
key := buildEdgeKey(traceID.HexString(), span.SpanID().HexString())
key := store.NewKey(traceID, span.SpanID())
isNew, err = p.store.UpsertEdge(key, func(e *store.Edge) {
e.TraceID = traceID
e.ConnectionType = connectionType
@@ -219,7 +219,7 @@ func (p *processor) aggregateMetrics(ctx context.Context, td ptrace.Traces) (err
fallthrough
case ptrace.SpanKindServer:
traceID := span.TraceID()
key := buildEdgeKey(traceID.HexString(), span.ParentSpanID().HexString())
key := store.NewKey(traceID, span.ParentSpanID())
isNew, err = p.store.UpsertEdge(key, func(e *store.Edge) {
e.TraceID = traceID
e.ConnectionType = connectionType
@@ -491,14 +491,6 @@ func (p *processor) cleanCache() {
}
}

func buildEdgeKey(k1, k2 string) string {
var b strings.Builder
b.WriteString(k1)
b.WriteString("-")
b.WriteString(k2)
return b.String()
}

// durationToMillis converts the given duration to the number of milliseconds it represents.
// Note that this can return sub-millisecond (i.e. < 1ms) values as well.
func durationToMillis(d time.Duration) float64 {