From c8a01452726a4385b7de6dca81120efd1be58ef6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Markus=20Th=C3=B6mmes?= Date: Tue, 7 Apr 2020 15:05:08 +0200 Subject: [PATCH 1/6] Store last error instead of status changes. --- pkg/autoscaler/metrics/collector.go | 42 ++++++--- pkg/autoscaler/metrics/collector_test.go | 111 +++++++---------------- 2 files changed, 63 insertions(+), 90 deletions(-) diff --git a/pkg/autoscaler/metrics/collector.go b/pkg/autoscaler/metrics/collector.go index 940e6a8e14f8..4b4e1b5897f6 100644 --- a/pkg/autoscaler/metrics/collector.go +++ b/pkg/autoscaler/metrics/collector.go @@ -143,7 +143,7 @@ func (c *MetricCollector) CreateOrUpdate(metric *av1alpha1.Metric) error { if exists { collection.updateScraper(scraper) collection.updateMetric(metric) - return nil + return collection.lastError() } c.collectionsMutex.Lock() @@ -153,7 +153,7 @@ func (c *MetricCollector) CreateOrUpdate(metric *av1alpha1.Metric) error { if exists { collection.updateScraper(scraper) collection.updateMetric(metric) - return nil + return collection.lastError() } c.collections[key] = newCollection(metric, scraper, c.tickProvider, c.logger) @@ -224,6 +224,9 @@ type collection struct { metricMutex sync.RWMutex metric *av1alpha1.Metric + errMutex sync.RWMutex + lastErr error + scraperMutex sync.RWMutex scraper StatsScraper concurrencyBuckets *aggregation.TimedFloat64Buckets @@ -282,21 +285,17 @@ func newCollection(metric *av1alpha1.Metric, scraper StatsScraper, tickFactory f currentMetric := c.currentMetric() if currentMetric.Spec.ScrapeTarget == "" { // Don't scrape empty target service. + if c.updateLastError(nil) { + // Notify reconciler. + } continue } stat, err := c.getScraper().Scrape(currentMetric.Spec.StableWindow) if err != nil { - copy := metric.DeepCopy() - switch { - case err == ErrFailedGetEndpoints: - copy.Status.MarkMetricNotReady("NoEndpoints", ErrFailedGetEndpoints.Error()) - case err == ErrDidNotReceiveStat: - copy.Status.MarkMetricFailed("DidNotReceiveStat", ErrDidNotReceiveStat.Error()) - default: - copy.Status.MarkMetricNotReady("CreateOrUpdateFailed", "Collector has failed.") - } logger.Errorw("Failed to scrape metrics", zap.Error(err)) - c.updateMetric(copy) + } + if c.updateLastError(err) { + // Notify reconciler. } if stat != emptyStat { c.record(stat) @@ -328,6 +327,25 @@ func (c *collection) currentMetric() *av1alpha1.Metric { return c.metric } +// updateLastError updates the last error returned from the scraper. +func (c *collection) updateLastError(err error) bool { + c.errMutex.Lock() + defer c.errMutex.Unlock() + + if c.lastErr == err { + return false + } + c.lastErr = err + return true +} + +func (c *collection) lastError() error { + c.errMutex.RLock() + defer c.errMutex.RUnlock() + + return c.lastErr +} + // record adds a stat to the current collection. func (c *collection) record(stat Stat) { // Proxied requests have been counted at the activator. Subtract diff --git a/pkg/autoscaler/metrics/collector_test.go b/pkg/autoscaler/metrics/collector_test.go index 109901ea3840..ff523f1c746f 100644 --- a/pkg/autoscaler/metrics/collector_test.go +++ b/pkg/autoscaler/metrics/collector_test.go @@ -23,12 +23,9 @@ import ( "time" "github.com/google/go-cmp/cmp" - corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/equality" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" - duckv1 "knative.dev/pkg/apis/duck/v1" . "knative.dev/pkg/logging/testing" av1alpha1 "knative.dev/serving/pkg/apis/autoscaling/v1alpha1" "knative.dev/serving/pkg/apis/serving" @@ -387,11 +384,25 @@ func TestMetricCollectorRecord(t *testing.T) { } func TestMetricCollectorError(t *testing.T) { + testMetric := &av1alpha1.Metric{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: fake.TestNamespace, + Name: fake.TestRevision, + Labels: map[string]string{ + serving.RevisionLabelKey: fake.TestRevision, + }, + }, + Spec: av1alpha1.MetricSpec{ + ScrapeTarget: fake.TestRevision + "-zhudex", + }, + } + + errOther := errors.New("foo") + testCases := []struct { - name string - scraper *testScraper - metric *av1alpha1.Metric - expectedMetricStatus duckv1.Status + name string + scraper *testScraper + expectedError error }{{ name: "Failed to get endpoints scraper error", scraper: &testScraper{ @@ -399,26 +410,7 @@ func TestMetricCollectorError(t *testing.T) { return emptyStat, ErrFailedGetEndpoints }, }, - metric: &av1alpha1.Metric{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: fake.TestNamespace, - Name: fake.TestRevision, - Labels: map[string]string{ - serving.RevisionLabelKey: fake.TestRevision, - }, - }, - Spec: av1alpha1.MetricSpec{ - ScrapeTarget: fake.TestRevision + "-zhudex", - }, - }, - expectedMetricStatus: duckv1.Status{ - Conditions: duckv1.Conditions{{ - Type: av1alpha1.MetricConditionReady, - Status: corev1.ConditionUnknown, - Reason: "NoEndpoints", - Message: ErrFailedGetEndpoints.Error(), - }}, - }, + expectedError: ErrFailedGetEndpoints, }, { name: "Did not receive stat scraper error", scraper: &testScraper{ @@ -426,53 +418,15 @@ func TestMetricCollectorError(t *testing.T) { return emptyStat, ErrDidNotReceiveStat }, }, - metric: &av1alpha1.Metric{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: fake.TestNamespace, - Name: fake.TestRevision, - Labels: map[string]string{ - serving.RevisionLabelKey: fake.TestRevision, - }, - }, - Spec: av1alpha1.MetricSpec{ - ScrapeTarget: fake.TestRevision + "-zhudex", - }, - }, - expectedMetricStatus: duckv1.Status{ - Conditions: duckv1.Conditions{{ - Type: av1alpha1.MetricConditionReady, - Status: corev1.ConditionFalse, - Reason: "DidNotReceiveStat", - Message: ErrDidNotReceiveStat.Error(), - }}, - }, + expectedError: ErrDidNotReceiveStat, }, { name: "Other scraper error", scraper: &testScraper{ s: func() (Stat, error) { - return emptyStat, errors.New("foo") + return emptyStat, errOther }, }, - metric: &av1alpha1.Metric{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: fake.TestNamespace, - Name: fake.TestRevision, - Labels: map[string]string{ - serving.RevisionLabelKey: fake.TestRevision, - }, - }, - Spec: av1alpha1.MetricSpec{ - ScrapeTarget: fake.TestRevision + "-zhudex", - }, - }, - expectedMetricStatus: duckv1.Status{ - Conditions: duckv1.Conditions{{ - Type: av1alpha1.MetricConditionReady, - Status: corev1.ConditionUnknown, - Reason: "CreateOrUpdateFailed", - Message: "Collector has failed.", - }}, - }, + expectedError: errOther, }} logger := TestLogger(t) @@ -480,22 +434,23 @@ func TestMetricCollectorError(t *testing.T) { t.Run(test.name, func(t *testing.T) { factory := scraperFactory(test.scraper, nil) coll := NewMetricCollector(factory, logger) - coll.CreateOrUpdate(test.metric) - key := types.NamespacedName{Namespace: test.metric.Namespace, Name: test.metric.Name} + coll.CreateOrUpdate(testMetric) + key := types.NamespacedName{Namespace: testMetric.Namespace, Name: testMetric.Name} - var got duckv1.Status wait.PollImmediate(10*time.Millisecond, 2*time.Second, func() (bool, error) { collection, ok := coll.collections[key] - if ok { - got = collection.currentMetric().Status.Status - return equality.Semantic.DeepEqual(got, test.expectedMetricStatus), nil + if !ok { + return false, nil } - return false, nil + return collection.lastError() == test.expectedError, nil }) - if !equality.Semantic.DeepEqual(got, test.expectedMetricStatus) { - t.Errorf("Got = %#v, want: %#v, diff:\n%q", got, test.expectedMetricStatus, cmp.Diff(got, test.expectedMetricStatus)) + + // Simulate a reconcile. + if err := coll.CreateOrUpdate(testMetric); err != test.expectedError { + t.Fatalf("lastError = %v, want %v", err, test.expectedError) } - coll.Delete(test.metric.Namespace, test.metric.Name) + + coll.Delete(testMetric.Namespace, testMetric.Name) }) } } From 6067d6179737b61bfe985bd6d5fa04b0da897ce0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Markus=20Th=C3=B6mmes?= Date: Tue, 7 Apr 2020 15:26:19 +0200 Subject: [PATCH 2/6] Set status in the reconciler accordingly. --- pkg/reconciler/metric/metric.go | 15 +++++++--- pkg/reconciler/metric/metric_test.go | 44 +++++++++++++++++++++------- 2 files changed, 44 insertions(+), 15 deletions(-) diff --git a/pkg/reconciler/metric/metric.go b/pkg/reconciler/metric/metric.go index ab7683d6ad11..4e05d603bd78 100644 --- a/pkg/reconciler/metric/metric.go +++ b/pkg/reconciler/metric/metric.go @@ -18,7 +18,6 @@ package metric import ( "context" - "fmt" "knative.dev/serving/pkg/apis/autoscaling/v1alpha1" "knative.dev/serving/pkg/autoscaler/metrics" @@ -40,9 +39,17 @@ func (r *reconciler) ReconcileKind(ctx context.Context, metric *v1alpha1.Metric) metric.Status.InitializeConditions() if err := r.collector.CreateOrUpdate(metric); err != nil { - // If create or update fails, we won't be able to collect at all. - metric.Status.MarkMetricFailed("CollectionFailed", "Failed to reconcile metric collection") - return fmt.Errorf("failed to initiate or update scraping: %w", err) + switch err { + case metrics.ErrFailedGetEndpoints: + metric.Status.MarkMetricNotReady("NoEndpoints", err.Error()) + case metrics.ErrDidNotReceiveStat: + metric.Status.MarkMetricFailed("DidNotReceiveStat", err.Error()) + default: + metric.Status.MarkMetricFailed("CollectionFailed", "Failed to reconcile metric collection: "+err.Error()) + } + + // We don't return an error because retrying is of no use. We'll be poked by collector on a change. + return nil } metric.Status.MarkMetricReady() diff --git a/pkg/reconciler/metric/metric_test.go b/pkg/reconciler/metric/metric_test.go index 72b6877f0661..456e45cf0dbb 100644 --- a/pkg/reconciler/metric/metric_test.go +++ b/pkg/reconciler/metric/metric_test.go @@ -121,15 +121,10 @@ func TestReconcile(t *testing.T) { Objects: []runtime.Object{ metric("bad", "collector"), }, - WantEvents: []string{ - Eventf(corev1.EventTypeWarning, "InternalError", - "failed to initiate or update scraping: the-error"), - }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: metric("bad", "collector", failed("CollectionFailed", - "Failed to reconcile metric collection")), + "Failed to reconcile metric collection: the-error")), }}, - WantErr: true, }, { Name: "cannot create collection-part II", Ctx: context.WithValue(context.Background(), collectorKey{}, @@ -138,13 +133,34 @@ func TestReconcile(t *testing.T) { Key: "bad/collector", Objects: []runtime.Object{ metric("bad", "collector", failed("CollectionFailed", - "Failed to reconcile metric collection")), + "Failed to reconcile metric collection: the-error")), }, - WantEvents: []string{ - Eventf(corev1.EventTypeWarning, "InternalError", - "failed to initiate or update scraping: the-error"), + }, { + Name: "no endpoints error", + Ctx: context.WithValue(context.Background(), collectorKey{}, + &testCollector{createOrUpdateError: metrics.ErrFailedGetEndpoints}, + ), + Key: "bad/collector", + Objects: []runtime.Object{ + metric("bad", "collector"), }, - WantErr: true, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: metric("bad", "collector", unknown("NoEndpoints", + metrics.ErrFailedGetEndpoints.Error())), + }}, + }, { + Name: "no stats error", + Ctx: context.WithValue(context.Background(), collectorKey{}, + &testCollector{createOrUpdateError: metrics.ErrDidNotReceiveStat}, + ), + Key: "bad/collector", + Objects: []runtime.Object{ + metric("bad", "collector"), + }, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: metric("bad", "collector", failed("DidNotReceiveStat", + metrics.ErrDidNotReceiveStat.Error())), + }}, }} table.Test(t, MakeFactory(func(ctx context.Context, listers *Listers, cmw configmap.Watcher) controller.Reconciler { @@ -212,6 +228,12 @@ func failed(r, m string) metricOption { } } +func unknown(r, m string) metricOption { + return func(metric *av1alpha1.Metric) { + metric.Status.MarkMetricNotReady(r, m) + } +} + func ready(m *av1alpha1.Metric) { m.Status.MarkMetricReady() } From 693790e677e3263ac395494ab6a8123347494600 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Markus=20Th=C3=B6mmes?= Date: Tue, 7 Apr 2020 16:08:56 +0200 Subject: [PATCH 3/6] Implement Watch mechanism. --- pkg/autoscaler/metrics/collector.go | 40 +++++++++++++++++++++++----- pkg/reconciler/metric/controller.go | 2 ++ pkg/reconciler/metric/metric_test.go | 2 ++ 3 files changed, 37 insertions(+), 7 deletions(-) diff --git a/pkg/autoscaler/metrics/collector.go b/pkg/autoscaler/metrics/collector.go index 4b4e1b5897f6..5b6ea1276261 100644 --- a/pkg/autoscaler/metrics/collector.go +++ b/pkg/autoscaler/metrics/collector.go @@ -18,7 +18,6 @@ package metrics import ( "errors" - "fmt" "sync" "time" @@ -90,6 +89,8 @@ type Collector interface { Record(key types.NamespacedName, stat Stat) // Delete deletes a Metric and halts collection. Delete(string, string) error + // Watch registers a singleton function to call when collector status changes. + Watch(func(types.NamespacedName)) } // MetricClient surfaces the metrics that can be obtained via the collector. @@ -112,6 +113,9 @@ type MetricCollector struct { collections map[types.NamespacedName]*collection collectionsMutex sync.RWMutex + + watcherMutex sync.RWMutex + watcher func(types.NamespacedName) } var _ Collector = (*MetricCollector)(nil) @@ -156,7 +160,7 @@ func (c *MetricCollector) CreateOrUpdate(metric *av1alpha1.Metric) error { return collection.lastError() } - c.collections[key] = newCollection(metric, scraper, c.tickProvider, c.logger) + c.collections[key] = newCollection(metric, scraper, c.tickProvider, c.Inform, c.logger) return nil } @@ -183,6 +187,27 @@ func (c *MetricCollector) Record(key types.NamespacedName, stat Stat) { } } +// Watch registers a singleton function to call when collector status changes. +func (c *MetricCollector) Watch(fn func(types.NamespacedName)) { + c.watcherMutex.Lock() + defer c.watcherMutex.Unlock() + + if c.watcher != nil { + c.logger.Fatal("Multiple calls to Watch() not supported") + } + c.watcher = fn +} + +// Inform sends an update to the registered watcher function, if it is set. +func (c *MetricCollector) Inform(event types.NamespacedName) { + c.watcherMutex.RLock() + defer c.watcherMutex.RUnlock() + + if c.watcher != nil { + c.watcher(event) + } +} + // StableAndPanicConcurrency returns both the stable and the panic concurrency. // It may truncate metric buckets as a side-effect. func (c *MetricCollector) StableAndPanicConcurrency(key types.NamespacedName, now time.Time) (float64, float64, error) { @@ -252,7 +277,8 @@ func (c *collection) getScraper() StatsScraper { // newCollection creates a new collection, which uses the given scraper to // collect stats every scrapeTickInterval. -func newCollection(metric *av1alpha1.Metric, scraper StatsScraper, tickFactory func(time.Duration) *time.Ticker, logger *zap.SugaredLogger) *collection { +func newCollection(metric *av1alpha1.Metric, scraper StatsScraper, tickFactory func(time.Duration) *time.Ticker, + callback func(types.NamespacedName), logger *zap.SugaredLogger) *collection { c := &collection{ metric: metric, concurrencyBuckets: aggregation.NewTimedFloat64Buckets( @@ -268,8 +294,8 @@ func newCollection(metric *av1alpha1.Metric, scraper StatsScraper, tickFactory f stopCh: make(chan struct{}), } - logger = logger.Named("collector").With( - zap.String(logkey.Key, fmt.Sprintf("%s/%s", metric.Namespace, metric.Name))) + key := types.NamespacedName{Namespace: metric.Namespace, Name: metric.Name} + logger = logger.Named("collector").With(zap.String(logkey.Key, key.String())) c.grp.Add(1) go func() { @@ -286,7 +312,7 @@ func newCollection(metric *av1alpha1.Metric, scraper StatsScraper, tickFactory f if currentMetric.Spec.ScrapeTarget == "" { // Don't scrape empty target service. if c.updateLastError(nil) { - // Notify reconciler. + callback(key) } continue } @@ -295,7 +321,7 @@ func newCollection(metric *av1alpha1.Metric, scraper StatsScraper, tickFactory f logger.Errorw("Failed to scrape metrics", zap.Error(err)) } if c.updateLastError(err) { - // Notify reconciler. + callback(key) } if stat != emptyStat { c.record(stat) diff --git a/pkg/reconciler/metric/controller.go b/pkg/reconciler/metric/controller.go index b6a05b81ac46..8f1ddd3a18fd 100644 --- a/pkg/reconciler/metric/controller.go +++ b/pkg/reconciler/metric/controller.go @@ -66,5 +66,7 @@ func NewController( }, }) + collector.Watch(impl.EnqueueKey) + return impl } diff --git a/pkg/reconciler/metric/metric_test.go b/pkg/reconciler/metric/metric_test.go index 456e45cf0dbb..eb774af4615c 100644 --- a/pkg/reconciler/metric/metric_test.go +++ b/pkg/reconciler/metric/metric_test.go @@ -285,3 +285,5 @@ func (c *testCollector) Delete(namespace, name string) error { } return c.deleteError } + +func (c *testCollector) Watch(func(types.NamespacedName)) {} From 9ddd281617408ad352e6efaab0e67d87067ef58b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Markus=20Th=C3=B6mmes?= Date: Tue, 7 Apr 2020 16:24:28 +0200 Subject: [PATCH 4/6] Use watch mechanism in tests. --- pkg/autoscaler/metrics/collector_test.go | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/pkg/autoscaler/metrics/collector_test.go b/pkg/autoscaler/metrics/collector_test.go index ff523f1c746f..52fa0ea35b47 100644 --- a/pkg/autoscaler/metrics/collector_test.go +++ b/pkg/autoscaler/metrics/collector_test.go @@ -434,16 +434,18 @@ func TestMetricCollectorError(t *testing.T) { t.Run(test.name, func(t *testing.T) { factory := scraperFactory(test.scraper, nil) coll := NewMetricCollector(factory, logger) + watchCh := make(chan types.NamespacedName) + coll.Watch(func(key types.NamespacedName) { + watchCh <- key + }) coll.CreateOrUpdate(testMetric) key := types.NamespacedName{Namespace: testMetric.Namespace, Name: testMetric.Name} - wait.PollImmediate(10*time.Millisecond, 2*time.Second, func() (bool, error) { - collection, ok := coll.collections[key] - if !ok { - return false, nil - } - return collection.lastError() == test.expectedError, nil - }) + // Expect an event to be propagated because we're erroring. + event := <-watchCh + if event != key { + t.Fatalf("Event = %v, want %v", event, key) + } // Simulate a reconcile. if err := coll.CreateOrUpdate(testMetric); err != test.expectedError { From 3477cf9aa14a7036469416d21d67f221e5445891 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Markus=20Th=C3=B6mmes?= Date: Tue, 7 Apr 2020 16:27:32 +0200 Subject: [PATCH 5/6] Fix comment. --- pkg/autoscaler/metrics/collector_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/autoscaler/metrics/collector_test.go b/pkg/autoscaler/metrics/collector_test.go index 52fa0ea35b47..a18d90c90c93 100644 --- a/pkg/autoscaler/metrics/collector_test.go +++ b/pkg/autoscaler/metrics/collector_test.go @@ -447,9 +447,9 @@ func TestMetricCollectorError(t *testing.T) { t.Fatalf("Event = %v, want %v", event, key) } - // Simulate a reconcile. + // Make sure the error is surfaced via 'CreateOrUpdate', which is called in the reconciler. if err := coll.CreateOrUpdate(testMetric); err != test.expectedError { - t.Fatalf("lastError = %v, want %v", err, test.expectedError) + t.Fatalf("CreateOrUpdate = %v, want %v", err, test.expectedError) } coll.Delete(testMetric.Namespace, testMetric.Name) From bbc32c833807ae44e7b30ed34cfe0193c19a9ff8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Markus=20Th=C3=B6mmes?= Date: Tue, 7 Apr 2020 19:09:37 +0200 Subject: [PATCH 6/6] Review comments. --- pkg/autoscaler/metrics/collector.go | 6 ++++-- pkg/reconciler/metric/metric.go | 3 ++- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/pkg/autoscaler/metrics/collector.go b/pkg/autoscaler/metrics/collector.go index 5b6ea1276261..6fe835b348ea 100644 --- a/pkg/autoscaler/metrics/collector.go +++ b/pkg/autoscaler/metrics/collector.go @@ -89,7 +89,8 @@ type Collector interface { Record(key types.NamespacedName, stat Stat) // Delete deletes a Metric and halts collection. Delete(string, string) error - // Watch registers a singleton function to call when collector status changes. + // Watch registers a singleton function to call when a specific collector's status changes. + // The passed name is the namespace/name of the metric owned by the respective collector. Watch(func(types.NamespacedName)) } @@ -353,7 +354,8 @@ func (c *collection) currentMetric() *av1alpha1.Metric { return c.metric } -// updateLastError updates the last error returned from the scraper. +// updateLastError updates the last error returned from the scraper +// and returns true if the error or error state changed. func (c *collection) updateLastError(err error) bool { c.errMutex.Lock() defer c.errMutex.Unlock() diff --git a/pkg/reconciler/metric/metric.go b/pkg/reconciler/metric/metric.go index 4e05d603bd78..000abc3fe4c5 100644 --- a/pkg/reconciler/metric/metric.go +++ b/pkg/reconciler/metric/metric.go @@ -45,7 +45,8 @@ func (r *reconciler) ReconcileKind(ctx context.Context, metric *v1alpha1.Metric) case metrics.ErrDidNotReceiveStat: metric.Status.MarkMetricFailed("DidNotReceiveStat", err.Error()) default: - metric.Status.MarkMetricFailed("CollectionFailed", "Failed to reconcile metric collection: "+err.Error()) + metric.Status.MarkMetricFailed("CollectionFailed", + "Failed to reconcile metric collection: "+err.Error()) } // We don't return an error because retrying is of no use. We'll be poked by collector on a change.