Skip to content

Commit

Permalink
Add metrics to watch manager (#366)
Browse files Browse the repository at this point in the history
* Add metrics to watch manager

Signed-off-by: Max Smythe <smythe@google.com>

* Address review comments

Signed-off-by: Max Smythe <smythe@google.com>

* Reset state instead of using a count basis

Signed-off-by: Max Smythe <smythe@google.com>

* Fix lint errors

Signed-off-by: Max Smythe <smythe@google.com>
  • Loading branch information
maxsmythe authored Jan 3, 2020
1 parent 1f0ea7b commit a2981bb
Show file tree
Hide file tree
Showing 8 changed files with 339 additions and 9 deletions.
6 changes: 5 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,11 @@ func main() {
setupLog.Error(err, "unable to set up OPA client")
}

wm := watch.New(mgr.GetConfig())
wm, err := watch.New(mgr.GetConfig())
if err != nil {
setupLog.Error(err, "unable to create watch manager")
os.Exit(1)
}
if err := mgr.Add(wm); err != nil {
setupLog.Error(err, "unable to register watch manager to the manager")
os.Exit(1)
Expand Down
5 changes: 4 additions & 1 deletion pkg/controller/config/config_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,10 @@ func TestReconcile(t *testing.T) {
ctrl.SetLogger(zap.Logger(true))
mgr, err := manager.New(cfg, manager.Options{MetricsBindAddress: "0"})
g.Expect(err).NotTo(gomega.HaveOccurred())
watcher := watch.New(mgr.GetConfig())
watcher, err := watch.New(mgr.GetConfig())
if err != nil {
t.Fatalf("could not create watch manager: %s", err)
}
if err := mgr.Add(watcher); err != nil {
t.Fatalf("could not add watch manager to manager: %s", err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/constraint/stats_reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func TestReportConstraints(t *testing.T) {
}
}
if int64(value.Value) != expectedValue {
t.Errorf("Metric: %v - Expected %v, got %v", totalConstraintsName, value.Value, expectedValue)
t.Errorf("Metric: %v - Expected %v, got %v", totalConstraintsName, expectedValue, value.Value)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,10 @@ violation[{"msg": "denied!"}] {
ctrl.SetLogger(zap.Logger(true))
mgr, err := manager.New(cfg, manager.Options{MetricsBindAddress: "0"})
g.Expect(err).NotTo(gomega.HaveOccurred())
wm := watch.New(mgr.GetConfig())
wm, err := watch.New(mgr.GetConfig())
if err != nil {
t.Fatalf("could not create watch manager: %s", err)
}
if err := mgr.Add(wm); err != nil {
t.Fatalf("could not add watch manager to manager: %s", err)
}
Expand Down
33 changes: 31 additions & 2 deletions pkg/watch/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type Manager struct {
watchedKinds map[schema.GroupVersionKind]vitals
cfg *rest.Config
newDiscovery func(*rest.Config) (Discovery, error)
metrics *reporter
}

type Discovery interface {
Expand All @@ -47,18 +48,23 @@ func newDiscovery(c *rest.Config) (Discovery, error) {
return discovery.NewDiscoveryClientForConfig(c)
}

func New(cfg *rest.Config) *Manager {
func New(cfg *rest.Config) (*Manager, error) {
metrics, err := newStatsReporter()
if err != nil {
return nil, err
}
wm := &Manager{
newMgrFn: newMgr,
stopper: func() {},
managedKinds: newRecordKeeper(),
watchedKinds: make(map[schema.GroupVersionKind]vitals),
cfg: cfg,
newDiscovery: newDiscovery,
metrics: metrics,
}
wm.started.Store(false)
wm.managedKinds.mgr = wm
return wm
return wm, nil
}

func (wm *Manager) NewRegistrar(parent string, addFns []func(manager.Manager, schema.GroupVersionKind) error) (*Registrar, error) {
Expand All @@ -82,6 +88,9 @@ func (wm *Manager) updateManager() (bool, error) {
if err != nil {
return false, errp.Wrap(err, "error while retrieving managedKinds, not restarting watch manager")
}
if err := wm.metrics.reportGvkIntentCount(int64(len(intent))); err != nil {
log.Error(err, "while reporting gvk intent count metric")
}
added, removed, changed, err := wm.gatherChanges(intent)
if err != nil {
return false, errp.Wrap(err, "error gathering watch changes, not restarting watch manager")
Expand Down Expand Up @@ -160,6 +169,10 @@ func (wm *Manager) Start(done <-chan struct{}) error {
func (wm *Manager) updateOrPause() (bool, error) {
wm.startedMux.Lock()
defer wm.startedMux.Unlock()
// Report restart check after acquiring the lock so that we can detect deadlocks
if err := wm.metrics.reportRestartCheck(); err != nil {
log.Error(err, "while trying to report restart check metric")
}
if wm.paused {
log.Info("update manager is paused")
return false, nil
Expand Down Expand Up @@ -219,6 +232,11 @@ func (wm *Manager) restartManager(kinds map[schema.GroupVersionKind]vitals) erro
}
}

// reporting the restart after all potentially blocking calls will help narrow
// down the cause of any deadlocks by checking if last_restart > last_restart_check
if err := wm.metrics.reportRestart(); err != nil {
log.Error(err, "while trying to report restart metric")
}
wm.stopped = make(chan struct{})
stopper := make(chan struct{})
stopOnce := sync.Once{}
Expand All @@ -232,6 +250,17 @@ func (wm *Manager) restartManager(kinds map[schema.GroupVersionKind]vitals) erro
func (wm *Manager) startMgr(mgr manager.Manager, stopper chan struct{}, stopped chan<- struct{}, kinds []string) {
defer wm.started.Store(false)
defer close(stopped)
if err := wm.metrics.reportIsRunning(1); err != nil {
log.Error(err, "while trying to report running metric")
}
defer func() {
if err := wm.metrics.reportIsRunning(0); err != nil {
log.Error(err, "while trying to report stopped metric")
}
}()
if err := wm.metrics.reportGvkCount(int64(len(kinds))); err != nil {
log.Error(err, "while trying to report gvk count metric")
}
log.Info("Calling Manager.Start()", "kinds", kinds)
wm.started.Store(true)
if err := mgr.Start(stopper); err != nil {
Expand Down
14 changes: 11 additions & 3 deletions pkg/watch/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,23 @@ import (
"sigs.k8s.io/controller-runtime/pkg/webhook"
)

func newForTest(fn func(*rest.Config) (Discovery, error)) *Manager {
func newForTest(fn func(*rest.Config) (Discovery, error)) (*Manager, error) {
metrics, err := newStatsReporter()
if err != nil {
return nil, err
}
wm := &Manager{
newMgrFn: newFakeMgr,
stopper: func() {},
managedKinds: newRecordKeeper(),
watchedKinds: make(map[schema.GroupVersionKind]vitals),
cfg: nil,
newDiscovery: fn,
metrics: metrics,
}
wm.managedKinds.mgr = wm
wm.started.Store(false)
return wm
return wm, nil
}

func newFakeMgr(wm *Manager) (manager.Manager, error) {
Expand Down Expand Up @@ -137,7 +142,10 @@ func waitForWatchManagerStart(wm *Manager) bool {
}

func TestRegistrar(t *testing.T) {
wm := newForTest(newDiscoveryFactory(false, "FooCRD"))
wm, err := newForTest(newDiscoveryFactory(false, "FooCRD"))
if err != nil {
t.Fatalf("Error creating Manager: %s", err)
}
defer wm.close()
reg, err := wm.NewRegistrar("foo", nil)
if err != nil {
Expand Down
124 changes: 124 additions & 0 deletions pkg/watch/stats_reporter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package watch

import (
"context"
"time"

"github.com/open-policy-agent/gatekeeper/pkg/metrics"
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
)

const (
lastRestart = "watch_manager_last_restart_time"
lastRestartCheck = "watch_manager_last_restart_check_time"
totalRestarts = "watch_manager_total_restart_attempts"
gvkCount = "watch_manager_total_watched_gvk"
gvkIntentCount = "watch_manager_total_intended_watch_gvk"
isRunning = "watch_manager_is_running"
)

var (
lastRestartM = stats.Float64(lastRestart, "Timestamp of last watch manager restart", stats.UnitSeconds)
lastRestartCheckM = stats.Float64(lastRestartCheck, "Timestamp of last time watch manager checked if it needed to restart", stats.UnitSeconds)
gvkCountM = stats.Int64(gvkCount, "Total number of watched GroupVersionKinds", stats.UnitDimensionless)
gvkIntentCountM = stats.Int64(gvkIntentCount, "Total number of GroupVersionKinds with a registered watch intent", stats.UnitDimensionless)
isRunningM = stats.Int64(isRunning, "One if the watch manager is running, zero if not", stats.UnitDimensionless)

views = []*view.View{
{
Name: lastRestart,
Measure: lastRestartM,
Description: "The epoch timestamp of the last time the watch manager has restarted",
Aggregation: view.LastValue(),
},
{
Name: totalRestarts,
Measure: lastRestartM,
Description: "Total number of times the watch manager has restarted",
Aggregation: view.Count(),
},
{
Name: lastRestartCheck,
Measure: lastRestartCheckM,
Description: "The epoch timestamp of the last time the watch manager was checked for a restart condition. This is a heartbeat that should occur regularly",
Aggregation: view.LastValue(),
},
{
Name: gvkCount,
Measure: gvkCountM,
Description: "The total number of Group/Version/Kinds currently watched by the watch manager",
Aggregation: view.LastValue(),
},
{
Name: gvkIntentCount,
Measure: gvkIntentCountM,
Description: "The total number of Group/Version/Kinds that the watch manager has instructions to watch. This could differ from the actual count due to resources being pending, non-existent, or a failure of the watch manager to restart",
Aggregation: view.LastValue(),
},
{
Name: isRunning,
Measure: isRunningM,
Description: "Whether the watch manager is running. This is expected to be 1 the majority of the time with brief periods of downtime due to the watch manager being paused or restarted",
Aggregation: view.LastValue(),
},
}
)

func init() {
if err := register(); err != nil {
panic(err)
}
}

func register() error {
return view.Register(views...)
}

func reset() error {
view.Unregister(views...)
return register()
}

// now returns the timestamp as a second-denominated float
func now() float64 {
return float64(time.Now().UnixNano()) / 1e9
}

func (r *reporter) reportRestartCheck() error {
return metrics.Record(r.ctx, lastRestartCheckM.M(r.now()))
}

func (r *reporter) reportRestart() error {
return metrics.Record(r.ctx, lastRestartM.M(r.now()))
}

func (r *reporter) reportGvkCount(count int64) error {
return metrics.Record(r.ctx, gvkCountM.M(count))
}

func (r *reporter) reportGvkIntentCount(count int64) error {
return metrics.Record(r.ctx, gvkIntentCountM.M(count))
}

func (r *reporter) reportIsRunning(running int64) error {
return metrics.Record(r.ctx, isRunningM.M(running))
}

// newStatsReporter creates a reporter for watch metrics
func newStatsReporter() (*reporter, error) {
ctx, err := tag.New(
context.TODO(),
)
if err != nil {
return nil, err
}

return &reporter{ctx: ctx, now: now}, nil
}

type reporter struct {
ctx context.Context
now func() float64
}
Loading

0 comments on commit a2981bb

Please sign in to comment.