Skip to content

Commit

Permalink
merge bucketed subscope to V4 release (#191)
Browse files Browse the repository at this point in the history
* increment version

* Use bucketed subscope (#184)

* remove unused function

Co-authored-by: Cristian Velazquez <cdvr1993@gmail.com>
  • Loading branch information
brawndou and cdvr1993 authored Dec 6, 2022
1 parent 92a308b commit 7f86cc4
Show file tree
Hide file tree
Showing 3 changed files with 174 additions and 61 deletions.
17 changes: 9 additions & 8 deletions scope.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,14 @@ type scope struct {

// ScopeOptions is a set of options to construct a scope.
type ScopeOptions struct {
Tags map[string]string
Prefix string
Reporter StatsReporter
CachedReporter CachedStatsReporter
Separator string
DefaultBuckets Buckets
SanitizeOptions *SanitizeOptions
Tags map[string]string
Prefix string
Reporter StatsReporter
CachedReporter CachedStatsReporter
Separator string
DefaultBuckets Buckets
SanitizeOptions *SanitizeOptions
registryShardCount uint
}

// NewRootScope creates a new root Scope with a set of options and
Expand Down Expand Up @@ -171,7 +172,7 @@ func newRootScope(opts ScopeOptions, interval time.Duration) *scope {
s.tags = s.copyAndSanitizeMap(opts.Tags)

// Register the root scope
s.registry = newScopeRegistry(s)
s.registry = newScopeRegistryWithShardCount(s, opts.registryShardCount)

if interval > 0 {
s.wg.Add(1)
Expand Down
75 changes: 75 additions & 0 deletions scope_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ package tally
import (
"fmt"
"strconv"
"sync/atomic"
"testing"
"time"
)
Expand Down Expand Up @@ -139,6 +140,80 @@ func BenchmarkScopeTaggedNoCachedSubscopes(b *testing.B) {
}
}

func BenchmarkScopeTaggedNoCachedSubscopesParallel(b *testing.B) {
root, _ := NewRootScope(ScopeOptions{
Prefix: "funkytown",
Reporter: NullStatsReporter,
Tags: map[string]string{
"style": "funky",
"hair": "wavy",
"jefferson": "starship",
},
}, 0)

b.ResetTimer()

index := int64(0)
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
n := atomic.AddInt64(&index, 1)
value := strconv.Itoa(int(n))

// Validated that the compiler is not optimizing this with a cpu profiler.
// Check https://github.com/uber-go/tally/pull/184 for more details
root.Tagged(map[string]string{
"foo": value,
"baz": value,
"qux": value,
})
}
})
}

func BenchmarkScopeTaggedNoCachedSubscopesParallelPercentageCached(b *testing.B) {
percentageCached := int64(5)
root, _ := NewRootScope(ScopeOptions{
Prefix: "funkytown",
Reporter: NullStatsReporter,
Tags: map[string]string{
"style": "funky",
"hair": "wavy",
"jefferson": "starship",
},
}, 0)

cachedMap := map[string]string{
"foo": "any",
"baz": "any",
"qux": "any",
}
root.Tagged(cachedMap)

b.ResetTimer()

index := int64(-1)
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
n := atomic.AddInt64(&index, 1)

if (n % 100) < percentageCached {
root.Tagged(cachedMap)
continue
}

value := strconv.Itoa(int(n))

// Validated that the compiler is not optimizing this with a cpu profiler.
// Check https://github.com/uber-go/tally/pull/184 for more details
root.Tagged(map[string]string{
"foo": value,
"baz": value,
"qux": value,
})
}
})
}

func BenchmarkNameGenerationNoPrefix(b *testing.B) {
root, _ := NewRootScope(ScopeOptions{
Reporter: NullStatsReporter,
Expand Down
143 changes: 90 additions & 53 deletions scope_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,64 +21,92 @@
package tally

import (
"hash/maphash"
"runtime"
"sync"
"unsafe"
)

var scopeRegistryKey = keyForPrefixedStringMaps

type scopeRegistry struct {
mu sync.RWMutex
root *scope
subscopes map[string]*scope
seed maphash.Seed
root *scope
// We need a subscope per GOPROC so that we can take advantage of all the cpu available to the application.
subscopes []*scopeBucket
}

func newScopeRegistry(root *scope) *scopeRegistry {
type scopeBucket struct {
mu sync.RWMutex
s map[string]*scope
}

func newScopeRegistryWithShardCount(root *scope, shardCount uint) *scopeRegistry {
if shardCount == 0 {
shardCount = uint(runtime.GOMAXPROCS(-1))
}

r := &scopeRegistry{
root: root,
subscopes: make(map[string]*scope),
subscopes: make([]*scopeBucket, shardCount),
seed: maphash.MakeSeed(),
}

for i := uint(0); i < shardCount; i++ {
r.subscopes[i] = &scopeBucket{
s: make(map[string]*scope),
}
r.subscopes[i].s[scopeRegistryKey(root.prefix, root.tags)] = root
}
r.subscopes[scopeRegistryKey(root.prefix, root.tags)] = root

return r
}

func (r *scopeRegistry) Report(reporter StatsReporter) {
defer r.purgeIfRootClosed()
r.mu.RLock()
defer r.mu.RUnlock()

for name, s := range r.subscopes {
s.report(reporter)
for _, subscopeBucket := range r.subscopes {
subscopeBucket.mu.RLock()

if s.closed.Load() {
r.removeWithRLock(name)
s.clearMetrics()
for name, s := range subscopeBucket.s {
s.report(reporter)

if s.closed.Load() {
r.removeWithRLock(subscopeBucket, name)
s.clearMetrics()
}
}

subscopeBucket.mu.RUnlock()
}
}

func (r *scopeRegistry) CachedReport() {
defer r.purgeIfRootClosed()

r.mu.RLock()
defer r.mu.RUnlock()
for _, subscopeBucket := range r.subscopes {
subscopeBucket.mu.RLock()

for name, s := range r.subscopes {
s.cachedReport()
for name, s := range subscopeBucket.s {
s.cachedReport()

if s.closed.Load() {
r.removeWithRLock(name)
s.clearMetrics()
if s.closed.Load() {
r.removeWithRLock(subscopeBucket, name)
s.clearMetrics()
}
}

subscopeBucket.mu.RUnlock()
}
}

func (r *scopeRegistry) ForEachScope(f func(*scope)) {
r.mu.RLock()
defer r.mu.RUnlock()

for _, s := range r.subscopes {
f(s)
for _, subscopeBucket := range r.subscopes {
for _, s := range subscopeBucket.s {
subscopeBucket.mu.RLock()
f(s)
subscopeBucket.mu.RUnlock()
}
}
}

Expand All @@ -87,29 +115,37 @@ func (r *scopeRegistry) Subscope(parent *scope, prefix string, tags map[string]s
return NoopScope.(*scope)
}

buf := keyForPrefixedStringMapsAsKey(make([]byte, 0, 256), prefix, parent.tags, tags)
r.mu.RLock()
var (
buf = keyForPrefixedStringMapsAsKey(make([]byte, 0, 256), prefix, parent.tags, tags)
h maphash.Hash
)

h.SetSeed(r.seed)
_, _ = h.Write(buf)
subscopeBucket := r.subscopes[h.Sum64()%uint64(len(r.subscopes))]

subscopeBucket.mu.RLock()
// buf is stack allocated and casting it to a string for lookup from the cache
// as the memory layout of []byte is a superset of string the below casting is safe and does not do any alloc
// However it cannot be used outside of the stack; a heap allocation is needed if that string needs to be stored
// in the map as a key
if s, ok := r.lockedLookup(*(*string)(unsafe.Pointer(&buf))); ok {
r.mu.RUnlock()
if s, ok := r.lockedLookup(subscopeBucket, *(*string)(unsafe.Pointer(&buf))); ok {
subscopeBucket.mu.RUnlock()
return s
}
r.mu.RUnlock()
subscopeBucket.mu.RUnlock()

// heap allocating the buf as a string to keep the key in the subscopes map
preSanitizeKey := string(buf)
tags = parent.copyAndSanitizeMap(tags)
key := scopeRegistryKey(prefix, parent.tags, tags)

r.mu.Lock()
defer r.mu.Unlock()
subscopeBucket.mu.Lock()
defer subscopeBucket.mu.Unlock()

if s, ok := r.lockedLookup(key); ok {
if _, ok = r.lockedLookup(preSanitizeKey); !ok {
r.subscopes[preSanitizeKey] = s
if s, ok := r.lockedLookup(subscopeBucket, key); ok {
if _, ok = r.lockedLookup(subscopeBucket, preSanitizeKey); !ok {
subscopeBucket.s[preSanitizeKey] = s
}
return s
}
Expand Down Expand Up @@ -138,15 +174,15 @@ func (r *scopeRegistry) Subscope(parent *scope, prefix string, tags map[string]s
bucketCache: parent.bucketCache,
done: make(chan struct{}),
}
r.subscopes[key] = subscope
if _, ok := r.lockedLookup(preSanitizeKey); !ok {
r.subscopes[preSanitizeKey] = subscope
subscopeBucket.s[key] = subscope
if _, ok := r.lockedLookup(subscopeBucket, preSanitizeKey); !ok {
subscopeBucket.s[preSanitizeKey] = subscope
}
return subscope
}

func (r *scopeRegistry) lockedLookup(key string) (*scope, bool) {
ss, ok := r.subscopes[key]
func (r *scopeRegistry) lockedLookup(subscopeBucket *scopeBucket, key string) (*scope, bool) {
ss, ok := subscopeBucket.s[key]
return ss, ok
}

Expand All @@ -155,22 +191,23 @@ func (r *scopeRegistry) purgeIfRootClosed() {
return
}

r.mu.Lock()
defer r.mu.Unlock()

for k, s := range r.subscopes {
_ = s.Close()
s.clearMetrics()
delete(r.subscopes, k)
for _, subscopeBucket := range r.subscopes {
subscopeBucket.mu.Lock()
for k, s := range subscopeBucket.s {
_ = s.Close()
s.clearMetrics()
delete(subscopeBucket.s, k)
}
subscopeBucket.mu.Unlock()
}
}

func (r *scopeRegistry) removeWithRLock(key string) {
func (r *scopeRegistry) removeWithRLock(subscopeBucket *scopeBucket, key string) {
// n.b. This function must lock the registry for writing and return it to an
// RLocked state prior to exiting. Defer order is important (LIFO).
r.mu.RUnlock()
defer r.mu.RLock()
r.mu.Lock()
defer r.mu.Unlock()
delete(r.subscopes, key)
subscopeBucket.mu.RUnlock()
defer subscopeBucket.mu.RLock()
subscopeBucket.mu.Lock()
defer subscopeBucket.mu.Unlock()
delete(subscopeBucket.s, key)
}

0 comments on commit 7f86cc4

Please sign in to comment.