Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[metric] #3 Reset the declared_resources metric when at a new commit [restored] #1232

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 72 additions & 1 deletion e2e/testcases/otel_collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ func TestGCMMetrics(t *testing.T) {
if err != nil {
nt.T.Fatal(err)
}
_, err = retry.Retry(60*time.Second, func() error {
_, err = retry.Retry(120*time.Second, func() error {
var err error
for _, metricType := range GCMMetricTypes {
descriptor := fmt.Sprintf("%s/%s", GCMMetricPrefix, metricType)
Expand All @@ -270,6 +270,54 @@ func TestGCMMetrics(t *testing.T) {
if err != nil {
nt.T.Fatal(err)
}

nt.T.Log("Adding test namespace")
namespace := fake.NamespaceObject("foo")
nt.Must(nt.RootRepos[configsync.RootSyncName].Add("acme/ns.yaml", namespace))
nt.Must(nt.RootRepos[configsync.RootSyncName].CommitAndPush("Adding foo namespace"))
if err := nt.WatchForAllSyncs(); err != nil {
nt.T.Fatal(err)
}

nt.T.Log("Checking resource related metrics after adding test resource")
resourceMetrics := []string{
csmetrics.DeclaredResourcesName,
rgmetrics.ResourceCountName,
rgmetrics.ReadyResourceCountName,
}
_, err = retry.Retry(120*time.Second, func() error {
var err error
for _, metricType := range resourceMetrics {
descriptor := fmt.Sprintf("%s/%s", GCMMetricPrefix, metricType)
it := listMetricInGCM(ctx, nt, client, startTime, descriptor)
err = multierr.Append(err, validateMetricInGCM(nt, it, descriptor, nt.ClusterName, metricHasValue(3)))
}
return err
})
if err != nil {
nt.T.Fatal(err)
}

nt.T.Log("Remove the test resource")
nt.Must(nt.RootRepos[configsync.RootSyncName].Remove("acme/ns.yaml"))
nt.Must(nt.RootRepos[configsync.RootSyncName].CommitAndPush("Remove the test namespace"))
if err := nt.WatchForAllSyncs(); err != nil {
nt.T.Fatal(err)
}

nt.T.Log("Checking resource related metrics after removing test resource")
_, err = retry.Retry(120*time.Second, func() error {
var err error
for _, metricType := range resourceMetrics {
descriptor := fmt.Sprintf("%s/%s", GCMMetricPrefix, metricType)
it := listMetricInGCM(ctx, nt, client, startTime, descriptor)
err = multierr.Append(err, validateMetricInGCM(nt, it, descriptor, nt.ClusterName, metricHasLastestValue(2)))
}
return err
})
if err != nil {
nt.T.Fatal(err)
}
}

// TestOtelCollectorGCMLabelAggregation validates that Google Cloud Monitoring
Expand Down Expand Up @@ -451,6 +499,29 @@ func metricDoesNotHaveLabel(label string) metricValidatorFunc {
}
}

func metricHasValue(value int64) metricValidatorFunc {
return func(series *monitoringpb.TimeSeries) error {
points := series.GetPoints()
for _, point := range points {
if point.GetValue().GetInt64Value() == value {
return nil
}
}
return fmt.Errorf("expected metric to have value %v but did not find in response", value)
}
}

func metricHasLastestValue(value int64) metricValidatorFunc {
return func(series *monitoringpb.TimeSeries) error {
points := series.GetPoints()
lastPoint := points[len(points)-1]
if lastPoint.GetValue().GetInt64Value() == value {
return nil
}
return fmt.Errorf("expected metric to have latest value %v but did not find in response", value)
}
}

// Validates a metricType from a specific cluster_name can be found within given
// TimeSeries
func validateMetricInGCM(nt *nomostest.NT, it *monitoringv2.TimeSeriesIterator, metricType, clusterName string, valFns ...metricValidatorFunc) error {
Expand Down
90 changes: 40 additions & 50 deletions pkg/declared/resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,14 @@ type Resources struct {
objectMap *orderedmap.OrderedMap[core.ID, *unstructured.Unstructured]
// commit of the source in which the resources were declared
commit string
// previousCommit is the preceding commit to the commit
previousCommit string
tiffanny29631 marked this conversation as resolved.
Show resolved Hide resolved
}

// Update performs an atomic update on the resource declaration set.
func (r *Resources) Update(ctx context.Context, objects []client.Object, commit string) ([]client.Object, status.Error) {
r.mutex.Lock()
defer r.mutex.Unlock()
// First build up the new map using a local pointer/reference.
newSet := orderedmap.NewOrderedMap[core.ID, *unstructured.Unstructured]()
newObjects := []client.Object{}
Expand All @@ -68,95 +72,81 @@ func (r *Resources) Update(ctx context.Context, objects []client.Object, commit

// Record the declared_resources metric, after parsing but before validation.
metrics.RecordDeclaredResources(ctx, commit, len(newObjects))
if r.previousCommit != commit && r.previousCommit != "" {
// For Cloud Monitoring, we have configured otel-collector to remove the
// commit label, to reduce cardinality, but this aggregation uses the max
// value (b/321875474). So in order for the latest commit to be chosen as
// the max value, we reset the previous commit value to zero.
// TODO: Remove this workaround after migrating to the otel-collector metrics client and switching from gauge to async gauge
metrics.RecordDeclaredResources(ctx, r.previousCommit, 0)
}

previousSet, _ := r.getObjectMap()
if err := deletesAllNamespaces(previousSet, newSet); err != nil {
if err := deletesAllNamespaces(r.objectMap, newSet); err != nil {
return nil, err
}

// Now assign the pointer for the new map to the struct reference in a
// threadsafe context. From now on, this map is read-only.
r.setObjectMap(newSet, commit)
r.previousCommit = commit
r.objectMap = newSet
r.commit = commit
return newObjects, nil
}

// Get returns a copy of the resource declaration as read from Git
func (r *Resources) Get(id core.ID) (*unstructured.Unstructured, string, bool) {
objSet, commit := r.getObjectMap()
if objSet == nil || objSet.Len() == 0 {
return nil, commit, false
r.mutex.RLock()
defer r.mutex.RUnlock()
if r.objectMap == nil || r.objectMap.Len() == 0 {
return nil, r.commit, false
}

// A local reference to the map is threadsafe since only the struct reference
// is replaced on update.
u, found := objSet.Get(id)
u, found := r.objectMap.Get(id)
// We return a copy of the Unstructured, as
// 1) client.Client methods mutate the objects passed into them.
// 2) We don't want to persist any changes made to an object we retrieved
// from a declared.Resources.
return u.DeepCopy(), commit, found
return u.DeepCopy(), r.commit, found
}

// DeclaredUnstructureds returns all resource objects declared in the source,
// along with the source commit.
func (r *Resources) DeclaredUnstructureds() ([]*unstructured.Unstructured, string) {
objSet, commit := r.getObjectMap()
if objSet == nil || objSet.Len() == 0 {
return nil, commit
r.mutex.RLock()
defer r.mutex.RUnlock()
if r.objectMap == nil || r.objectMap.Len() == 0 {
return nil, r.commit
}

// A local reference to the map is threadsafe since only the struct reference
// is replaced on update.
var objects []*unstructured.Unstructured
for pair := objSet.Front(); pair != nil; pair = pair.Next() {
for pair := r.objectMap.Front(); pair != nil; pair = pair.Next() {
objects = append(objects, pair.Value)
}
return objects, commit
return objects, r.commit
}

// DeclaredObjects returns all resource objects declared in the source, along
// with the source commit.
func (r *Resources) DeclaredObjects() ([]client.Object, string) {
objSet, commit := r.getObjectMap()
if objSet == nil || objSet.Len() == 0 {
return nil, commit
r.mutex.RLock()
defer r.mutex.RUnlock()
if r.objectMap == nil || r.objectMap.Len() == 0 {
return nil, r.commit
}

// A local reference to the map is threadsafe since only the struct reference
// is replaced on update.
var objects []client.Object
for pair := objSet.Front(); pair != nil; pair = pair.Next() {
for pair := r.objectMap.Front(); pair != nil; pair = pair.Next() {
objects = append(objects, pair.Value)
}
return objects, commit
return objects, r.commit
}

// DeclaredGVKs returns the set of all GroupVersionKind found in the source,
// along with the source commit.
func (r *Resources) DeclaredGVKs() (map[schema.GroupVersionKind]struct{}, string) {
objSet, commit := r.getObjectMap()
if objSet == nil || objSet.Len() == 0 {
return nil, commit
r.mutex.RLock()
defer r.mutex.RUnlock()
if r.objectMap == nil || r.objectMap.Len() == 0 {
return nil, r.commit
}

// A local reference to the objSet map is threadsafe since only the pointer to
// the map is replaced on update.
gvkSet := make(map[schema.GroupVersionKind]struct{})
for pair := objSet.Front(); pair != nil; pair = pair.Next() {
for pair := r.objectMap.Front(); pair != nil; pair = pair.Next() {
gvkSet[pair.Value.GroupVersionKind()] = struct{}{}
}
return gvkSet, commit
}

func (r *Resources) getObjectMap() (*orderedmap.OrderedMap[core.ID, *unstructured.Unstructured], string) {
r.mutex.RLock()
defer r.mutex.RUnlock()
return r.objectMap, r.commit
}

func (r *Resources) setObjectMap(objectMap *orderedmap.OrderedMap[core.ID, *unstructured.Unstructured], commit string) {
r.mutex.Lock()
defer r.mutex.Unlock()
r.objectMap = objectMap
r.commit = commit
return gvkSet, r.commit
}