From 5a057f6049b86647d83b0622f3c8a22b54d3148e Mon Sep 17 00:00:00 2001 From: Adrian Astley Date: Tue, 6 Feb 2024 17:13:54 -0500 Subject: [PATCH 1/9] [internal/exp/metrics] Add a new internal package for handling metric staleness It's a glorified wrapper over a Map type, which allows values to be expired based on a pre-supplied interval. --- internal/exp/metrics/go.mod | 8 +- internal/exp/metrics/go.sum | 5 + .../exp/metrics/staleness/priority_queue.go | 97 ++++++++++++++ .../metrics/staleness/priority_queue_test.go | 111 ++++++++++++++++ internal/exp/metrics/staleness/staleness.go | 55 ++++++++ .../exp/metrics/staleness/staleness_test.go | 121 ++++++++++++++++++ 6 files changed, 395 insertions(+), 2 deletions(-) create mode 100644 internal/exp/metrics/staleness/priority_queue.go create mode 100644 internal/exp/metrics/staleness/priority_queue_test.go create mode 100644 internal/exp/metrics/staleness/staleness.go create mode 100644 internal/exp/metrics/staleness/staleness_test.go diff --git a/internal/exp/metrics/go.mod b/internal/exp/metrics/go.mod index f61b7e1e87af..ca0a23cb1e86 100644 --- a/internal/exp/metrics/go.mod +++ b/internal/exp/metrics/go.mod @@ -1,19 +1,22 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics -replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil => ../../../pkg/pdatautil +go 1.20 require ( github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.95.0 + github.com/stretchr/testify v1.8.4 go.opentelemetry.io/collector/pdata v1.2.0 ) require ( github.com/cespare/xxhash/v2 v2.2.0 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/protobuf v1.5.3 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/net v0.18.0 // indirect golang.org/x/sys v0.14.0 // indirect @@ -21,8 +24,9 @@ require ( google.golang.org/genproto/googleapis/rpc v0.0.0-20231106174013-bbf56f31fb17 // indirect google.golang.org/grpc v1.61.0 // indirect google.golang.org/protobuf v1.32.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect ) -go 1.21 +replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil => ../../../pkg/pdatautil toolchain go1.21.1 diff --git a/internal/exp/metrics/go.sum b/internal/exp/metrics/go.sum index 6f3b578693b2..1c3e08163635 100644 --- a/internal/exp/metrics/go.sum +++ b/internal/exp/metrics/go.sum @@ -16,6 +16,8 @@ github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnr github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= @@ -23,6 +25,7 @@ github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9G github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= @@ -74,5 +77,7 @@ google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp0 google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.32.0 h1:pPC6BG5ex8PDFnkbrGU3EixyhKcQ2aDuBS36lqK/C7I= google.golang.org/protobuf v1.32.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/exp/metrics/staleness/priority_queue.go b/internal/exp/metrics/staleness/priority_queue.go new file mode 100644 index 000000000000..8a26aabc06b8 --- /dev/null +++ b/internal/exp/metrics/staleness/priority_queue.go @@ -0,0 +1,97 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package staleness + +import ( + "container/heap" + "time" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity" +) + +type priorityQueueImpl []*queueItem + +type queueItem struct { + key identity.Stream + prio time.Time + index int +} + +func (pq priorityQueueImpl) Len() int { return len(pq) } + +func (pq priorityQueueImpl) Less(i, j int) bool { + // We want Pop to give us the lowest priority + return pq[i].prio.Before(pq[j].prio) +} + +func (pq priorityQueueImpl) Swap(i, j int) { + pq[i], pq[j] = pq[j], pq[i] + pq[i].index = i + pq[j].index = j +} + +func (pq *priorityQueueImpl) Push(x any) { + n := len(*pq) + item := x.(*queueItem) + item.index = n + *pq = append(*pq, item) +} + +func (pq *priorityQueueImpl) Pop() any { + old := *pq + n := len(old) + item := old[n-1] + old[n-1] = nil // avoid memory leak + item.index = -1 // for safety + *pq = old[0 : n-1] + return item +} + +func (pq *priorityQueueImpl) Update(item *queueItem, newPrio time.Time) { + item.prio = newPrio + heap.Fix(pq, item.index) +} + +type PriorityQueue struct { + inner priorityQueueImpl + itemLookup map[identity.Stream]*queueItem +} + +func NewPriorityQueue() *PriorityQueue { + pq := &PriorityQueue{ + inner: priorityQueueImpl{}, + itemLookup: map[identity.Stream]*queueItem{}, + } + heap.Init(&pq.inner) + + return pq +} + +func (pq *PriorityQueue) Update(id identity.Stream, newPrio time.Time) { + item, ok := pq.itemLookup[id] + if !ok { + item = &queueItem{ + key: id, + prio: newPrio, + } + heap.Push(&pq.inner, item) + pq.itemLookup[id] = item + } else { + pq.inner.Update(item, newPrio) + } +} + +func (pq *PriorityQueue) Peek() (identity.Stream, time.Time) { + val := pq.inner[0] + return val.key, val.prio +} + +func (pq *PriorityQueue) Pop() (identity.Stream, time.Time) { + val := heap.Pop(&pq.inner).(*queueItem) + return val.key, val.prio +} + +func (pq *PriorityQueue) Len() int { + return pq.inner.Len() +} diff --git a/internal/exp/metrics/staleness/priority_queue_test.go b/internal/exp/metrics/staleness/priority_queue_test.go new file mode 100644 index 000000000000..373c4189bb61 --- /dev/null +++ b/internal/exp/metrics/staleness/priority_queue_test.go @@ -0,0 +1,111 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package staleness + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity" +) + +func TestPriorityQueueImpl(t *testing.T) { + t.Parallel() + + pq := NewPriorityQueue() + + idA := generateStreamID(t, map[string]any{ + "aaa": "123", + }) + idB := generateStreamID(t, map[string]any{ + "bbb": "456", + }) + idC := generateStreamID(t, map[string]any{ + "ccc": "789", + }) + + initialTime := time.Time{} + prioA := initialTime.Add(2 * time.Second) + prioB := initialTime.Add(1 * time.Second) + prioC := initialTime.Add(3 * time.Second) + + pq.Update(idA, prioA) + pq.Update(idB, prioB) + pq.Update(idC, prioC) + + // The first item should be B + id, prio := pq.Peek() + require.Equal(t, idB, id) + require.Equal(t, prioB, prio) + + // If we peek again, nothing should change + id, prio = pq.Peek() + require.Equal(t, idB, id) + require.Equal(t, prioB, prio) + + // Pop should return the same thing + id, prio = pq.Pop() + require.Equal(t, idB, id) + require.Equal(t, prioB, prio) + + // Now if we peek again, it should be the next item + id, prio = pq.Peek() + require.Equal(t, idA, id) + require.Equal(t, prioA, prio) + + // Pop should return the same thing + id, prio = pq.Pop() + require.Equal(t, idA, id) + require.Equal(t, prioA, prio) + + // One last time + id, prio = pq.Peek() + require.Equal(t, idC, id) + require.Equal(t, prioC, prio) + + // Pop should return the same thing + id, prio = pq.Pop() + require.Equal(t, idC, id) + require.Equal(t, prioC, prio) + + // The queue should now be empty + require.Equal(t, 0, pq.Len()) +} + +func generateStreamID(t *testing.T, attributes map[string]any) identity.Stream { + res := pcommon.NewResource() + err := res.Attributes().FromRaw(map[string]any{ + "foo": "bar", + "asdf": "qwer", + }) + require.NoError(t, err) + + scope := pcommon.NewInstrumentationScope() + scope.SetName("TestScope") + scope.SetVersion("v1.2.3") + err = scope.Attributes().FromRaw(map[string]any{ + "aaa": "bbb", + "ccc": "ddd", + }) + require.NoError(t, err) + + metric := pmetric.NewMetric() + + sum := metric.SetEmptySum() + sum.SetIsMonotonic(true) + sum.SetAggregationTemporality(pmetric.AggregationTemporalityDelta) + + dp := sum.DataPoints().AppendEmpty() + dp.SetStartTimestamp(678) + dp.SetTimestamp(789) + dp.SetDoubleValue(123.456) + err = dp.Attributes().FromRaw(attributes) + require.NoError(t, err) + + return identity.OfStream(identity.OfResourceMetric(res, scope, metric), dp) +} diff --git a/internal/exp/metrics/staleness/staleness.go b/internal/exp/metrics/staleness/staleness.go new file mode 100644 index 000000000000..9e760c82f6cd --- /dev/null +++ b/internal/exp/metrics/staleness/staleness.go @@ -0,0 +1,55 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package staleness + +import ( + "time" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity" +) + +// We override how Now() is returned, so we can have deterministic tests +var nowFunc = time.Now + +type Map[T any] interface { + Load(key identity.Stream) (T, bool) + Store(key identity.Stream, value T) + Delete(key identity.Stream) + // Range calls f sequentially for each key and value present in the map. + // If f returns false, range stops the iteration. + Range(f func(key identity.Stream, value T) bool) +} + +type Staleness[T any] struct { + max time.Duration + + Map[T] + pq *PriorityQueue +} + +func NewStaleness[T any](max time.Duration, newMap Map[T]) *Staleness[T] { + return &Staleness[T]{ + max: max, + Map: newMap, + pq: NewPriorityQueue(), + } +} + +func (s *Staleness[T]) Store(id identity.Stream, value T) { + s.pq.Update(id, nowFunc()) + s.Map.Store(id, value) +} + +func (s *Staleness[T]) ExpireOldEntries() { + now := nowFunc() + + for { + _, ts := s.pq.Peek() + if now.Sub(ts) < s.max { + break + } + id, _ := s.pq.Pop() + s.Map.Delete(id) + } +} diff --git a/internal/exp/metrics/staleness/staleness_test.go b/internal/exp/metrics/staleness/staleness_test.go new file mode 100644 index 000000000000..b1fd20b1fdfa --- /dev/null +++ b/internal/exp/metrics/staleness/staleness_test.go @@ -0,0 +1,121 @@ +package staleness + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity" +) + +// RawMap +var _ Map[time.Time] = (*RawMap[identity.Stream, time.Time])(nil) + +type RawMap[K comparable, V any] map[K]V + +func (rm *RawMap[K, V]) Load(key K) (V, bool) { + value, ok := (*rm)[key] + return value, ok +} + +func (rm *RawMap[K, V]) Store(key K, value V) { + (*rm)[key] = value +} + +func (rm *RawMap[K, V]) Delete(key K) { + delete(*rm, key) +} + +func (rm *RawMap[K, V]) Range(f func(key K, value V) bool) { + for key, value := range *rm { + if !f(key, value) { + return + } + } +} + +// Tests + +func TestStaleness(t *testing.T) { + t.Parallel() + + max := 1 * time.Second + stalenessMap := NewStaleness[int]( + max, + &RawMap[identity.Stream, int]{}, + ) + + idA := generateStreamID(t, map[string]any{ + "aaa": "123", + }) + idB := generateStreamID(t, map[string]any{ + "bbb": "456", + }) + idC := generateStreamID(t, map[string]any{ + "ccc": "789", + }) + idD := generateStreamID(t, map[string]any{ + "ddd": "024", + }) + + initialTime := time.Time{} + timeA := initialTime.Add(2 * time.Second) + timeB := initialTime.Add(1 * time.Second) + timeC := initialTime.Add(3 * time.Second) + timeD := initialTime.Add(4 * time.Second) + + valueA := 1 + valueB := 4 + valueC := 7 + valueD := 0 + + // Add the values to the map + nowFunc = func() time.Time { return timeA } + stalenessMap.Store(idA, valueA) + nowFunc = func() time.Time { return timeB } + stalenessMap.Store(idB, valueB) + nowFunc = func() time.Time { return timeC } + stalenessMap.Store(idC, valueC) + nowFunc = func() time.Time { return timeD } + stalenessMap.Store(idD, valueD) + + // Set the time to 2.5s and run expire + // This should remove B, but the others should remain + // (now == 2.5s, B == 1s, max == 1s) + // now > B + max + nowFunc = func() time.Time { return initialTime.Add(2500 * time.Millisecond) } + stalenessMap.ExpireOldEntries() + validateStalenessMapEntries(t, + map[identity.Stream]int{ + idA: valueA, + idC: valueC, + idD: valueD, + }, + stalenessMap, + ) + + // Set the time to 4.5s and run expire + // This should remove A and C, but D should remain + // (now == 2.5s, A == 2s, C == 3s, max == 1s) + // now > A + max AND now > C + max + nowFunc = func() time.Time { return initialTime.Add(4500 * time.Millisecond) } + stalenessMap.ExpireOldEntries() + validateStalenessMapEntries(t, + map[identity.Stream]int{ + idD: valueD, + }, + stalenessMap, + ) +} + +func validateStalenessMapEntries(t *testing.T, expected map[identity.Stream]int, sm *Staleness[int]) { + actual := map[identity.Stream]int{} + + sm.Range(func(key identity.Stream, value int) bool { + actual[key] = value + return true + }) + + require.Equal(t, expected, actual) +} From d0d9e9d791240f6243fb47eca03cdecc3b8240cb Mon Sep 17 00:00:00 2001 From: Adrian Astley Date: Tue, 13 Feb 2024 10:32:12 -0500 Subject: [PATCH 2/9] Properly clean up lookup map when popping --- internal/exp/metrics/staleness/priority_queue.go | 1 + internal/exp/metrics/staleness/priority_queue_test.go | 5 +++++ 2 files changed, 6 insertions(+) diff --git a/internal/exp/metrics/staleness/priority_queue.go b/internal/exp/metrics/staleness/priority_queue.go index 8a26aabc06b8..5e334be7b57d 100644 --- a/internal/exp/metrics/staleness/priority_queue.go +++ b/internal/exp/metrics/staleness/priority_queue.go @@ -89,6 +89,7 @@ func (pq *PriorityQueue) Peek() (identity.Stream, time.Time) { func (pq *PriorityQueue) Pop() (identity.Stream, time.Time) { val := heap.Pop(&pq.inner).(*queueItem) + delete(pq.itemLookup, val.key) return val.key, val.prio } diff --git a/internal/exp/metrics/staleness/priority_queue_test.go b/internal/exp/metrics/staleness/priority_queue_test.go index 373c4189bb61..b58478e7c1db 100644 --- a/internal/exp/metrics/staleness/priority_queue_test.go +++ b/internal/exp/metrics/staleness/priority_queue_test.go @@ -75,6 +75,11 @@ func TestPriorityQueueImpl(t *testing.T) { // The queue should now be empty require.Equal(t, 0, pq.Len()) + + // And the inner lookup map should also be empty + require.IsType(t, &heapPriorityQueue{}, pq) + heapQueue := pq.(*heapPriorityQueue) + require.Len(t, heapQueue.itemLookup, 0) } func generateStreamID(t *testing.T, attributes map[string]any) identity.Stream { From 8d2bb899413e846603134197257f412bf8061607 Mon Sep 17 00:00:00 2001 From: Adrian Astley Date: Tue, 13 Feb 2024 10:32:53 -0500 Subject: [PATCH 3/9] Make PriorityMap an interface So Staleness isn't so tightly tied to it --- .../exp/metrics/staleness/priority_queue.go | 39 +++++++++++-------- internal/exp/metrics/staleness/staleness.go | 2 +- 2 files changed, 24 insertions(+), 17 deletions(-) diff --git a/internal/exp/metrics/staleness/priority_queue.go b/internal/exp/metrics/staleness/priority_queue.go index 5e334be7b57d..e524ee112e88 100644 --- a/internal/exp/metrics/staleness/priority_queue.go +++ b/internal/exp/metrics/staleness/priority_queue.go @@ -10,7 +10,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity" ) -type priorityQueueImpl []*queueItem +type heapQueue []*queueItem type queueItem struct { key identity.Stream @@ -18,27 +18,27 @@ type queueItem struct { index int } -func (pq priorityQueueImpl) Len() int { return len(pq) } +func (pq heapQueue) Len() int { return len(pq) } -func (pq priorityQueueImpl) Less(i, j int) bool { +func (pq heapQueue) Less(i, j int) bool { // We want Pop to give us the lowest priority return pq[i].prio.Before(pq[j].prio) } -func (pq priorityQueueImpl) Swap(i, j int) { +func (pq heapQueue) Swap(i, j int) { pq[i], pq[j] = pq[j], pq[i] pq[i].index = i pq[j].index = j } -func (pq *priorityQueueImpl) Push(x any) { +func (pq *heapQueue) Push(x any) { n := len(*pq) item := x.(*queueItem) item.index = n *pq = append(*pq, item) } -func (pq *priorityQueueImpl) Pop() any { +func (pq *heapQueue) Pop() any { old := *pq n := len(old) item := old[n-1] @@ -48,19 +48,26 @@ func (pq *priorityQueueImpl) Pop() any { return item } -func (pq *priorityQueueImpl) Update(item *queueItem, newPrio time.Time) { +func (pq *heapQueue) Update(item *queueItem, newPrio time.Time) { item.prio = newPrio heap.Fix(pq, item.index) } -type PriorityQueue struct { - inner priorityQueueImpl +type PriorityQueue interface { + Update(id identity.Stream, newPrio time.Time) + Peek() (identity.Stream, time.Time) + Pop() (identity.Stream, time.Time) + Len() int +} + +type heapPriorityQueue struct { + inner heapQueue itemLookup map[identity.Stream]*queueItem } -func NewPriorityQueue() *PriorityQueue { - pq := &PriorityQueue{ - inner: priorityQueueImpl{}, +func NewPriorityQueue() PriorityQueue { + pq := &heapPriorityQueue{ + inner: heapQueue{}, itemLookup: map[identity.Stream]*queueItem{}, } heap.Init(&pq.inner) @@ -68,7 +75,7 @@ func NewPriorityQueue() *PriorityQueue { return pq } -func (pq *PriorityQueue) Update(id identity.Stream, newPrio time.Time) { +func (pq *heapPriorityQueue) Update(id identity.Stream, newPrio time.Time) { item, ok := pq.itemLookup[id] if !ok { item = &queueItem{ @@ -82,17 +89,17 @@ func (pq *PriorityQueue) Update(id identity.Stream, newPrio time.Time) { } } -func (pq *PriorityQueue) Peek() (identity.Stream, time.Time) { +func (pq *heapPriorityQueue) Peek() (identity.Stream, time.Time) { val := pq.inner[0] return val.key, val.prio } -func (pq *PriorityQueue) Pop() (identity.Stream, time.Time) { +func (pq *heapPriorityQueue) Pop() (identity.Stream, time.Time) { val := heap.Pop(&pq.inner).(*queueItem) delete(pq.itemLookup, val.key) return val.key, val.prio } -func (pq *PriorityQueue) Len() int { +func (pq *heapPriorityQueue) Len() int { return pq.inner.Len() } diff --git a/internal/exp/metrics/staleness/staleness.go b/internal/exp/metrics/staleness/staleness.go index 9e760c82f6cd..038ffceb76eb 100644 --- a/internal/exp/metrics/staleness/staleness.go +++ b/internal/exp/metrics/staleness/staleness.go @@ -25,7 +25,7 @@ type Staleness[T any] struct { max time.Duration Map[T] - pq *PriorityQueue + pq PriorityQueue } func NewStaleness[T any](max time.Duration, newMap Map[T]) *Staleness[T] { From 4b08da9994172ab073c0853a3a90d17744df94a5 Mon Sep 17 00:00:00 2001 From: Adrian Astley Date: Tue, 13 Feb 2024 10:37:51 -0500 Subject: [PATCH 4/9] Implement Map.LoadOrStore() --- internal/exp/metrics/staleness/staleness.go | 6 ++++++ internal/exp/metrics/staleness/staleness_test.go | 10 ++++++++++ 2 files changed, 16 insertions(+) diff --git a/internal/exp/metrics/staleness/staleness.go b/internal/exp/metrics/staleness/staleness.go index 038ffceb76eb..cf68946b9c59 100644 --- a/internal/exp/metrics/staleness/staleness.go +++ b/internal/exp/metrics/staleness/staleness.go @@ -13,8 +13,14 @@ import ( var nowFunc = time.Now type Map[T any] interface { + // Load the value at key. If it does not exist, the boolean will be false and the value returned will be the zero value Load(key identity.Stream) (T, bool) + // Store the given key value pair in the map Store(key identity.Stream, value T) + // LoadOrStore will either load the value from the map and return it and the boolean `true` + // or if it doesn't exist in the Map yet, the value passed in will be stored and then returned with the boolean `false` + LoadOrStore(key identity.Stream, value T) (T, bool) + // Remove the value at key from the map Delete(key identity.Stream) // Range calls f sequentially for each key and value present in the map. // If f returns false, range stops the iteration. diff --git a/internal/exp/metrics/staleness/staleness_test.go b/internal/exp/metrics/staleness/staleness_test.go index b1fd20b1fdfa..e2e4d009d156 100644 --- a/internal/exp/metrics/staleness/staleness_test.go +++ b/internal/exp/metrics/staleness/staleness_test.go @@ -19,6 +19,16 @@ func (rm *RawMap[K, V]) Load(key K) (V, bool) { return value, ok } +func (rm *RawMap[K, V]) LoadOrStore(key K, value V) (V, bool) { + returnedVal, ok := (*rm)[key] + if !ok { + (*rm)[key] = value + returnedVal = value + } + + return returnedVal, ok +} + func (rm *RawMap[K, V]) Store(key K, value V) { (*rm)[key] = value } From 9fecc8aa20918f119f1f53600cb61e86d9ed1cfb Mon Sep 17 00:00:00 2001 From: Adrian Astley Date: Wed, 14 Feb 2024 16:20:07 -0500 Subject: [PATCH 5/9] Use the new upcoming iterator style instead of our custom Range() --- internal/exp/metrics/staleness/staleness.go | 6 +++--- internal/exp/metrics/staleness/staleness_test.go | 14 ++++++++------ 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/internal/exp/metrics/staleness/staleness.go b/internal/exp/metrics/staleness/staleness.go index cf68946b9c59..afbcaf37a522 100644 --- a/internal/exp/metrics/staleness/staleness.go +++ b/internal/exp/metrics/staleness/staleness.go @@ -22,9 +22,9 @@ type Map[T any] interface { LoadOrStore(key identity.Stream, value T) (T, bool) // Remove the value at key from the map Delete(key identity.Stream) - // Range calls f sequentially for each key and value present in the map. - // If f returns false, range stops the iteration. - Range(f func(key identity.Stream, value T) bool) + // Items returns an iterator function that in future go version can be used with range + // See: https://go.dev/wiki/RangefuncExperiment + Items() func(yield func(identity.Stream, T) bool) bool } type Staleness[T any] struct { diff --git a/internal/exp/metrics/staleness/staleness_test.go b/internal/exp/metrics/staleness/staleness_test.go index e2e4d009d156..363378ce8063 100644 --- a/internal/exp/metrics/staleness/staleness_test.go +++ b/internal/exp/metrics/staleness/staleness_test.go @@ -37,11 +37,14 @@ func (rm *RawMap[K, V]) Delete(key K) { delete(*rm, key) } -func (rm *RawMap[K, V]) Range(f func(key K, value V) bool) { - for key, value := range *rm { - if !f(key, value) { - return +func (rm *RawMap[K, V]) Items() func(yield func(K, V) bool) bool { + return func(yield func(K, V) bool) bool { + for k, v := range *rm { + if !yield(k, v) { + break + } } + return false } } @@ -122,10 +125,9 @@ func TestStaleness(t *testing.T) { func validateStalenessMapEntries(t *testing.T, expected map[identity.Stream]int, sm *Staleness[int]) { actual := map[identity.Stream]int{} - sm.Range(func(key identity.Stream, value int) bool { + sm.Items()(func(key identity.Stream, value int) bool { actual[key] = value return true }) - require.Equal(t, expected, actual) } From c52291085a6cb1ac01f6ce750d8f991186f6e1da Mon Sep 17 00:00:00 2001 From: Adrian Astley Date: Tue, 20 Feb 2024 15:21:34 -0500 Subject: [PATCH 6/9] Move Staleness map to be internal So users are forced to use the correct methods. Also adds lots of documentation --- internal/exp/metrics/staleness/map.go | 51 +++++++++++++++++++ .../exp/metrics/staleness/priority_queue.go | 36 +++++++------ internal/exp/metrics/staleness/staleness.go | 50 ++++++++++-------- .../exp/metrics/staleness/staleness_test.go | 41 --------------- 4 files changed, 100 insertions(+), 78 deletions(-) create mode 100644 internal/exp/metrics/staleness/map.go diff --git a/internal/exp/metrics/staleness/map.go b/internal/exp/metrics/staleness/map.go new file mode 100644 index 000000000000..622f27ef1a21 --- /dev/null +++ b/internal/exp/metrics/staleness/map.go @@ -0,0 +1,51 @@ +package staleness + +import ( + "time" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity" +) + +// Map is an abstraction over a map +type Map[T any] interface { + // Load the value at key. If it does not exist, the boolean will be false and the value returned will be the zero value + Load(key identity.Stream) (T, bool) + // Store the given key value pair in the map + Store(key identity.Stream, value T) + // Remove the value at key from the map + Delete(key identity.Stream) + // Items returns an iterator function that in future go version can be used with range + // See: https://go.dev/wiki/RangefuncExperiment + Items() func(yield func(identity.Stream, T) bool) bool +} + +// RawMap implementation + +var _ Map[time.Time] = (*RawMap[identity.Stream, time.Time])(nil) + +// RawMap is an implementation of the Map interface using a standard golang map +type RawMap[K comparable, V any] map[K]V + +func (rm *RawMap[K, V]) Load(key K) (V, bool) { + value, ok := (*rm)[key] + return value, ok +} + +func (rm *RawMap[K, V]) Store(key K, value V) { + (*rm)[key] = value +} + +func (rm *RawMap[K, V]) Delete(key K) { + delete(*rm, key) +} + +func (rm *RawMap[K, V]) Items() func(yield func(K, V) bool) bool { + return func(yield func(K, V) bool) bool { + for k, v := range *rm { + if !yield(k, v) { + break + } + } + return false + } +} diff --git a/internal/exp/metrics/staleness/priority_queue.go b/internal/exp/metrics/staleness/priority_queue.go index e524ee112e88..d43676f15416 100644 --- a/internal/exp/metrics/staleness/priority_queue.go +++ b/internal/exp/metrics/staleness/priority_queue.go @@ -10,6 +10,21 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity" ) +// PriorityQueue represents a way to store entries sorted by their priority. +// Pop() will return the oldest entry of the set. +type PriorityQueue interface { + // Update will add or update an entry, and reshuffle the queue internally as needed to keep it sorted + Update(id identity.Stream, newPrio time.Time) + // Peek will return the entry at the HEAD of the queue *without* removing it from the queue + Peek() (identity.Stream, time.Time) + // Pop will remove the entry at the HEAD of the queue and return it + Pop() (identity.Stream, time.Time) + // Len will return the number of entries in the queue + Len() int +} + +// heapQueue implements heap.Interface. +// We use it as the inner implementation of a heap-based sorted queue type heapQueue []*queueItem type queueItem struct { @@ -48,18 +63,6 @@ func (pq *heapQueue) Pop() any { return item } -func (pq *heapQueue) Update(item *queueItem, newPrio time.Time) { - item.prio = newPrio - heap.Fix(pq, item.index) -} - -type PriorityQueue interface { - Update(id identity.Stream, newPrio time.Time) - Peek() (identity.Stream, time.Time) - Pop() (identity.Stream, time.Time) - Len() int -} - type heapPriorityQueue struct { inner heapQueue itemLookup map[identity.Stream]*queueItem @@ -76,16 +79,19 @@ func NewPriorityQueue() PriorityQueue { } func (pq *heapPriorityQueue) Update(id identity.Stream, newPrio time.Time) { + // Check if the entry already exists in the queue item, ok := pq.itemLookup[id] - if !ok { + if ok { + // If so, we can update it in place + item.prio = newPrio + heap.Fix(&pq.inner, item.index) + } else { item = &queueItem{ key: id, prio: newPrio, } heap.Push(&pq.inner, item) pq.itemLookup[id] = item - } else { - pq.inner.Update(item, newPrio) } } diff --git a/internal/exp/metrics/staleness/staleness.go b/internal/exp/metrics/staleness/staleness.go index afbcaf37a522..481149b2f7b8 100644 --- a/internal/exp/metrics/staleness/staleness.go +++ b/internal/exp/metrics/staleness/staleness.go @@ -12,41 +12,47 @@ import ( // We override how Now() is returned, so we can have deterministic tests var nowFunc = time.Now -type Map[T any] interface { - // Load the value at key. If it does not exist, the boolean will be false and the value returned will be the zero value - Load(key identity.Stream) (T, bool) - // Store the given key value pair in the map - Store(key identity.Stream, value T) - // LoadOrStore will either load the value from the map and return it and the boolean `true` - // or if it doesn't exist in the Map yet, the value passed in will be stored and then returned with the boolean `false` - LoadOrStore(key identity.Stream, value T) (T, bool) - // Remove the value at key from the map - Delete(key identity.Stream) - // Items returns an iterator function that in future go version can be used with range - // See: https://go.dev/wiki/RangefuncExperiment - Items() func(yield func(identity.Stream, T) bool) bool -} - +// Staleness a a wrapper over a map that adds an additional "staleness" value to each entry. Users can +// call ExpireOldEntries() to automatically remove all entries from the map whole staleness value is +// older than the `max` +// +// NOTE: Staleness methods are *not* thread-safe. If the user needs to use Staleness in a multi-threaded +// environment, then it is the user's responsibility to properly serialize calls to Staleness methods type Staleness[T any] struct { max time.Duration - Map[T] - pq PriorityQueue + items Map[T] + pq PriorityQueue } func NewStaleness[T any](max time.Duration, newMap Map[T]) *Staleness[T] { return &Staleness[T]{ - max: max, - Map: newMap, - pq: NewPriorityQueue(), + max: max, + items: newMap, + pq: NewPriorityQueue(), } } +// Load the value at key. If it does not exist, the boolean will be false and the value returned will be the zero value +func (s *Staleness[T]) Load(key identity.Stream) (T, bool) { + return s.items.Load(key) +} + +// Store the given key value pair in the map, and update the pair's staleness value to "now" func (s *Staleness[T]) Store(id identity.Stream, value T) { s.pq.Update(id, nowFunc()) - s.Map.Store(id, value) + s.items.Store(id, value) +} + +// Items returns an iterator function that in future go version can be used with range +// See: https://go.dev/wiki/RangefuncExperiment +func (s *Staleness[T]) Items() func(yield func(identity.Stream, T) bool) bool { + return s.items.Items() } +// ExpireOldEntries will remove all entries whose staleness value is older than `now() - max` +// For example, if an entry has a staleness value of two hours ago, and max == 1 hour, then the entry would +// be removed. But if an entry had a stalness value of 30 minutes, then it *wouldn't* be removed. func (s *Staleness[T]) ExpireOldEntries() { now := nowFunc() @@ -56,6 +62,6 @@ func (s *Staleness[T]) ExpireOldEntries() { break } id, _ := s.pq.Pop() - s.Map.Delete(id) + s.items.Delete(id) } } diff --git a/internal/exp/metrics/staleness/staleness_test.go b/internal/exp/metrics/staleness/staleness_test.go index 363378ce8063..00137d6e1ad5 100644 --- a/internal/exp/metrics/staleness/staleness_test.go +++ b/internal/exp/metrics/staleness/staleness_test.go @@ -9,47 +9,6 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity" ) -// RawMap -var _ Map[time.Time] = (*RawMap[identity.Stream, time.Time])(nil) - -type RawMap[K comparable, V any] map[K]V - -func (rm *RawMap[K, V]) Load(key K) (V, bool) { - value, ok := (*rm)[key] - return value, ok -} - -func (rm *RawMap[K, V]) LoadOrStore(key K, value V) (V, bool) { - returnedVal, ok := (*rm)[key] - if !ok { - (*rm)[key] = value - returnedVal = value - } - - return returnedVal, ok -} - -func (rm *RawMap[K, V]) Store(key K, value V) { - (*rm)[key] = value -} - -func (rm *RawMap[K, V]) Delete(key K) { - delete(*rm, key) -} - -func (rm *RawMap[K, V]) Items() func(yield func(K, V) bool) bool { - return func(yield func(K, V) bool) bool { - for k, v := range *rm { - if !yield(k, v) { - break - } - } - return false - } -} - -// Tests - func TestStaleness(t *testing.T) { t.Parallel() From 4b0be90795b8d6a733a4ebb6110d8f85f791bab3 Mon Sep 17 00:00:00 2001 From: Adrian Astley Date: Tue, 20 Feb 2024 15:37:26 -0500 Subject: [PATCH 7/9] Fix linter / check errors --- internal/exp/metrics/go.mod | 4 +--- internal/exp/metrics/go.sum | 4 ++++ internal/exp/metrics/staleness/map.go | 5 ++++- internal/exp/metrics/staleness/priority_queue.go | 2 +- internal/exp/metrics/staleness/staleness.go | 2 +- internal/exp/metrics/staleness/staleness_test.go | 3 +++ 6 files changed, 14 insertions(+), 6 deletions(-) diff --git a/internal/exp/metrics/go.mod b/internal/exp/metrics/go.mod index ca0a23cb1e86..6e54eb33f229 100644 --- a/internal/exp/metrics/go.mod +++ b/internal/exp/metrics/go.mod @@ -1,6 +1,6 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics -go 1.20 +go 1.21 require ( github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.95.0 @@ -28,5 +28,3 @@ require ( ) replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil => ../../../pkg/pdatautil - -toolchain go1.21.1 diff --git a/internal/exp/metrics/go.sum b/internal/exp/metrics/go.sum index 1c3e08163635..9a5008b4d916 100644 --- a/internal/exp/metrics/go.sum +++ b/internal/exp/metrics/go.sum @@ -17,7 +17,9 @@ github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHm github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= @@ -26,6 +28,7 @@ github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjY github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= +github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= @@ -79,5 +82,6 @@ google.golang.org/protobuf v1.32.0 h1:pPC6BG5ex8PDFnkbrGU3EixyhKcQ2aDuBS36lqK/C7 google.golang.org/protobuf v1.32.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/exp/metrics/staleness/map.go b/internal/exp/metrics/staleness/map.go index 622f27ef1a21..77b29f5febd2 100644 --- a/internal/exp/metrics/staleness/map.go +++ b/internal/exp/metrics/staleness/map.go @@ -1,4 +1,7 @@ -package staleness +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package staleness // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/staleness" import ( "time" diff --git a/internal/exp/metrics/staleness/priority_queue.go b/internal/exp/metrics/staleness/priority_queue.go index d43676f15416..f1b01743f95f 100644 --- a/internal/exp/metrics/staleness/priority_queue.go +++ b/internal/exp/metrics/staleness/priority_queue.go @@ -1,7 +1,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package staleness +package staleness // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/staleness" import ( "container/heap" diff --git a/internal/exp/metrics/staleness/staleness.go b/internal/exp/metrics/staleness/staleness.go index 481149b2f7b8..34c53e3702cd 100644 --- a/internal/exp/metrics/staleness/staleness.go +++ b/internal/exp/metrics/staleness/staleness.go @@ -1,7 +1,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package staleness +package staleness // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/staleness" import ( "time" diff --git a/internal/exp/metrics/staleness/staleness_test.go b/internal/exp/metrics/staleness/staleness_test.go index 00137d6e1ad5..688a6c7b3c0b 100644 --- a/internal/exp/metrics/staleness/staleness_test.go +++ b/internal/exp/metrics/staleness/staleness_test.go @@ -1,3 +1,6 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + package staleness import ( From 4917d1ffed8465f0a0195ad5f576cd61c1f03a8e Mon Sep 17 00:00:00 2001 From: Adrian Astley Date: Wed, 21 Feb 2024 12:28:59 -0500 Subject: [PATCH 8/9] The Staleness tests can't be parallel, because they use the global `NowFunc` --- internal/exp/metrics/staleness/staleness_test.go | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/internal/exp/metrics/staleness/staleness_test.go b/internal/exp/metrics/staleness/staleness_test.go index 688a6c7b3c0b..4d96b41061e6 100644 --- a/internal/exp/metrics/staleness/staleness_test.go +++ b/internal/exp/metrics/staleness/staleness_test.go @@ -13,8 +13,6 @@ import ( ) func TestStaleness(t *testing.T) { - t.Parallel() - max := 1 * time.Second stalenessMap := NewStaleness[int]( max, @@ -46,20 +44,20 @@ func TestStaleness(t *testing.T) { valueD := 0 // Add the values to the map - nowFunc = func() time.Time { return timeA } + NowFunc = func() time.Time { return timeA } stalenessMap.Store(idA, valueA) - nowFunc = func() time.Time { return timeB } + NowFunc = func() time.Time { return timeB } stalenessMap.Store(idB, valueB) - nowFunc = func() time.Time { return timeC } + NowFunc = func() time.Time { return timeC } stalenessMap.Store(idC, valueC) - nowFunc = func() time.Time { return timeD } + NowFunc = func() time.Time { return timeD } stalenessMap.Store(idD, valueD) // Set the time to 2.5s and run expire // This should remove B, but the others should remain // (now == 2.5s, B == 1s, max == 1s) // now > B + max - nowFunc = func() time.Time { return initialTime.Add(2500 * time.Millisecond) } + NowFunc = func() time.Time { return initialTime.Add(2500 * time.Millisecond) } stalenessMap.ExpireOldEntries() validateStalenessMapEntries(t, map[identity.Stream]int{ @@ -74,7 +72,7 @@ func TestStaleness(t *testing.T) { // This should remove A and C, but D should remain // (now == 2.5s, A == 2s, C == 3s, max == 1s) // now > A + max AND now > C + max - nowFunc = func() time.Time { return initialTime.Add(4500 * time.Millisecond) } + NowFunc = func() time.Time { return initialTime.Add(4500 * time.Millisecond) } stalenessMap.ExpireOldEntries() validateStalenessMapEntries(t, map[identity.Stream]int{ From d0f1d14b7eccd0d7162904100ad15499ecb31dd1 Mon Sep 17 00:00:00 2001 From: Adrian Astley Date: Wed, 21 Feb 2024 12:29:28 -0500 Subject: [PATCH 9/9] Expose `NowFunc` so other modules can do testing --- internal/exp/metrics/staleness/staleness.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/internal/exp/metrics/staleness/staleness.go b/internal/exp/metrics/staleness/staleness.go index 34c53e3702cd..f5803ccdeb55 100644 --- a/internal/exp/metrics/staleness/staleness.go +++ b/internal/exp/metrics/staleness/staleness.go @@ -10,7 +10,7 @@ import ( ) // We override how Now() is returned, so we can have deterministic tests -var nowFunc = time.Now +var NowFunc = time.Now // Staleness a a wrapper over a map that adds an additional "staleness" value to each entry. Users can // call ExpireOldEntries() to automatically remove all entries from the map whole staleness value is @@ -40,7 +40,7 @@ func (s *Staleness[T]) Load(key identity.Stream) (T, bool) { // Store the given key value pair in the map, and update the pair's staleness value to "now" func (s *Staleness[T]) Store(id identity.Stream, value T) { - s.pq.Update(id, nowFunc()) + s.pq.Update(id, NowFunc()) s.items.Store(id, value) } @@ -54,7 +54,7 @@ func (s *Staleness[T]) Items() func(yield func(identity.Stream, T) bool) bool { // For example, if an entry has a staleness value of two hours ago, and max == 1 hour, then the entry would // be removed. But if an entry had a stalness value of 30 minutes, then it *wouldn't* be removed. func (s *Staleness[T]) ExpireOldEntries() { - now := nowFunc() + now := NowFunc() for { _, ts := s.pq.Peek()