-
Notifications
You must be signed in to change notification settings - Fork 544
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
MVP: Cost attribution #10269
base: main
Are you sure you want to change the base?
MVP: Cost attribution #10269
Changes from 31 commits
e315ebb
f04c28f
2c422d1
d2eab6b
1f39282
9b4337d
1a523e1
f10f787
cc0e939
c020be0
71e4666
9dd101b
7d4ea9a
6754666
fffc5b3
2cf8c3e
f994034
b060c09
e35a8d9
5cc0b5d
389dff0
116a69e
9c30445
88ef49e
130636a
b701ba7
dccd9c8
eebd028
8386503
d8f1e9b
b9efb94
a37e6de
211b3a2
f697e6f
fe8a1e5
8b5836f
888d8b0
b15b487
87209d6
4706bde
1ab1f00
8111b6c
17b64a9
a191044
2bb1845
ddd507d
b27e379
a79fac7
37901b7
f7115f4
679f2cc
66accc9
f4a4efd
f90ac0e
1ab89c5
23b32cf
7a60c7d
0287bf6
9c4c2df
f8f2a49
1ad99ad
fa62ee1
1b0fb00
ced8346
0a7c858
4336f7f
67b6cea
800fe85
80e69fb
a2ffe5a
f28d672
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,162 @@ | ||
// SPDX-License-Identifier: AGPL-3.0-only | ||
|
||
package costattribution | ||
|
||
import ( | ||
"context" | ||
"sort" | ||
"sync" | ||
"time" | ||
|
||
"github.com/go-kit/log" | ||
"github.com/grafana/dskit/services" | ||
"github.com/prometheus/client_golang/prometheus" | ||
|
||
"github.com/grafana/mimir/pkg/util/validation" | ||
) | ||
|
||
const ( | ||
trackerLabel = "tracker" | ||
tenantLabel = "tenant" | ||
defaultTrackerName = "cost-attribution" | ||
missingValue = "__missing__" | ||
overflowValue = "__overflow__" | ||
) | ||
|
||
type Manager struct { | ||
services.Service | ||
logger log.Logger | ||
inactiveTimeout time.Duration | ||
limits *validation.Overrides | ||
|
||
mtx sync.RWMutex | ||
trackersByUserID map[string]*Tracker | ||
reg *prometheus.Registry | ||
cleanupInterval time.Duration | ||
metricsExportInterval time.Duration | ||
} | ||
|
||
func NewManager(cleanupInterval, exportInterval, inactiveTimeout time.Duration, logger log.Logger, limits *validation.Overrides, reg *prometheus.Registry) (*Manager, error) { | ||
m := &Manager{ | ||
trackersByUserID: make(map[string]*Tracker), | ||
limits: limits, | ||
mtx: sync.RWMutex{}, | ||
inactiveTimeout: inactiveTimeout, | ||
logger: logger, | ||
reg: reg, | ||
cleanupInterval: cleanupInterval, | ||
metricsExportInterval: exportInterval, | ||
} | ||
|
||
m.Service = services.NewTimerService(cleanupInterval, nil, m.iteration, nil).WithName("cost attribution manager") | ||
if err := reg.Register(m); err != nil { | ||
return nil, err | ||
} | ||
return m, nil | ||
} | ||
|
||
func (m *Manager) iteration(_ context.Context) error { | ||
return m.purgeInactiveAttributionsUntil(time.Now().Add(-m.inactiveTimeout).Unix()) | ||
} | ||
|
||
func (m *Manager) EnabledForUser(userID string) bool { | ||
if m == nil { | ||
return false | ||
} | ||
return len(m.limits.CostAttributionLabels(userID)) > 0 | ||
} | ||
|
||
func (m *Manager) Tracker(userID string) *Tracker { | ||
if !m.EnabledForUser(userID) { | ||
return nil | ||
} | ||
|
||
// Check if the tracker already exists, if exists return it. Otherwise lock and create a new tracker. | ||
m.mtx.RLock() | ||
tracker, exists := m.trackersByUserID[userID] | ||
m.mtx.RUnlock() | ||
if exists { | ||
return tracker | ||
} | ||
|
||
m.mtx.Lock() | ||
defer m.mtx.Unlock() | ||
if tracker, exists = m.trackersByUserID[userID]; exists { | ||
return tracker | ||
} | ||
tracker = newTracker(userID, m.limits.CostAttributionLabels(userID), m.limits.MaxCostAttributionCardinalityPerUser(userID), m.limits.CostAttributionCooldown(userID), m.logger) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit, but I would collect all the information needed to build a tracker (all the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. addressed in commit 679f2cc |
||
m.trackersByUserID[userID] = tracker | ||
return tracker | ||
} | ||
|
||
func (m *Manager) Collect(out chan<- prometheus.Metric) { | ||
m.mtx.RLock() | ||
defer m.mtx.RUnlock() | ||
for _, tracker := range m.trackersByUserID { | ||
tracker.Collect(out) | ||
} | ||
} | ||
|
||
func (m *Manager) Describe(chan<- *prometheus.Desc) { | ||
colega marked this conversation as resolved.
Show resolved
Hide resolved
|
||
// Describe is not implemented because the metrics include dynamic labels. The Manager functions as an unchecked exporter. | ||
// For more details, refer to the documentation: https://pkg.go.dev/github.com/prometheus/client_golang/prometheus#hdr-Custom_Collectors_and_constant_Metrics | ||
} | ||
|
||
func (m *Manager) deleteTracker(userID string) { | ||
m.mtx.Lock() | ||
defer m.mtx.Unlock() | ||
delete(m.trackersByUserID, userID) | ||
} | ||
|
||
func (m *Manager) updateTracker(userID string) *Tracker { | ||
t := m.Tracker(userID) | ||
|
||
if t == nil { | ||
m.deleteTracker(userID) | ||
return nil | ||
} | ||
|
||
newTrackedLabels := m.limits.CostAttributionLabels(userID) | ||
|
||
// sort the labels to ensure the order is consistent | ||
sort.Slice(newTrackedLabels, func(i, j int) bool { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it safe to modify the slice returned by There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. modify the copy instead, addressed in 888d8b0 |
||
return newTrackedLabels[i] < newTrackedLabels[j] | ||
}) | ||
ying-jeanne marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
// if the labels have changed or the max cardinality or cooldown duration have changed, create a new tracker | ||
if !t.hasSameLabels(newTrackedLabels) || t.maxCardinality != m.limits.MaxCostAttributionCardinalityPerUser(userID) || t.cooldownDuration != int64(m.limits.CostAttributionCooldown(userID).Seconds()) { | ||
m.mtx.Lock() | ||
t = newTracker(userID, newTrackedLabels, m.limits.MaxCostAttributionCardinalityPerUser(userID), m.limits.CostAttributionCooldown(userID), m.logger) | ||
m.trackersByUserID[userID] = t | ||
m.mtx.Unlock() | ||
return t | ||
} | ||
|
||
return t | ||
} | ||
|
||
func (m *Manager) purgeInactiveAttributionsUntil(deadline int64) error { | ||
m.mtx.RLock() | ||
userIDs := make([]string, 0, len(m.trackersByUserID)) | ||
for userID := range m.trackersByUserID { | ||
userIDs = append(userIDs, userID) | ||
} | ||
m.mtx.RUnlock() | ||
|
||
for _, userID := range userIDs { | ||
t := m.updateTracker(userID) | ||
if t == nil { | ||
continue | ||
} | ||
|
||
invalidKeys := t.inactiveObservations(deadline) | ||
for _, key := range invalidKeys { | ||
t.cleanupTrackerAttribution(key) | ||
} | ||
|
||
if t.shouldDelete(deadline) { | ||
m.deleteTracker(userID) | ||
ying-jeanne marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
} | ||
return nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this doesn't need to be exported.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
addressed in commit 679f2cc