From ba84bb5ca7bfae4879b166ffd0b90d833ccab5a3 Mon Sep 17 00:00:00 2001 From: Ashwin Venkatesh Date: Wed, 9 Aug 2023 14:42:40 -0400 Subject: [PATCH] Support custom watches on controller --- internal/controller/api.go | 64 ++++++++++++++++++++--- internal/controller/api_test.go | 45 ++++++++++++++++ internal/controller/controller.go | 59 ++++++++++++++++++++- internal/controller/dependency_mappers.go | 6 +++ 4 files changed, 166 insertions(+), 8 deletions(-) diff --git a/internal/controller/api.go b/internal/controller/api.go index 8f5d873368b5..b07c966cc57e 100644 --- a/internal/controller/api.go +++ b/internal/controller/api.go @@ -11,6 +11,7 @@ import ( "github.com/hashicorp/go-hclog" + "github.com/hashicorp/consul/agent/consul/controller/queue" "github.com/hashicorp/consul/internal/resource" "github.com/hashicorp/consul/proto-public/pbresource" ) @@ -46,6 +47,22 @@ func (c Controller) WithWatch(watchedType *pbresource.Type, mapper DependencyMap return c } +// WithCustomWatch adds a custom watch on the given type/dependency to the controller. custom mapper +// will be called to determine which resources must be reconciled as a result of +// an event. +func (c Controller) WithCustomWatch(source *Source, mapper CustomDependencyMapper) Controller { + if source == nil { + panic("source must not be nil") + } + + if mapper == nil { + panic("mapper must not be nil") + } + + c.customWatches = append(c.customWatches, customWatch{source, mapper}) + return c +} + // WithLogger changes the controller's logger. func (c Controller) WithLogger(logger hclog.Logger) Controller { if logger == nil { @@ -107,13 +124,14 @@ func (c Controller) backoff() (time.Duration, time.Duration) { // Use the builder methods in this package (starting with ForType) to construct // a controller, and then pass it to a Manager to be executed. type Controller struct { - managedType *pbresource.Type - reconciler Reconciler - logger hclog.Logger - watches []watch - baseBackoff time.Duration - maxBackoff time.Duration - placement Placement + managedType *pbresource.Type + reconciler Reconciler + logger hclog.Logger + watches []watch + customWatches []customWatch + baseBackoff time.Duration + maxBackoff time.Duration + placement Placement } type watch struct { @@ -121,6 +139,38 @@ type watch struct { mapper DependencyMapper } +// Watch is responsible for watching for custom events from source and adding them to +// the event queue. +func (s *Source) Watch(ctx context.Context, add func(e Event)) error { + for { + select { + case <-ctx.Done(): + return nil + case evt := <-s.Source: + add(evt) + } + } +} + +// Source is used as a generic source of events. This can be used when events aren't coming from resources +// stored by the resource API. +type Source struct { + Source <-chan Event +} + +type Event struct { + Obj queue.ItemType +} + +func (e Event) Key() string { + return e.Obj.Key() +} + +type customWatch struct { + source *Source + mapper CustomDependencyMapper +} + // Request represents a request to reconcile the resource with the given ID. type Request struct { // ID of the resource that needs to be reconciled. diff --git a/internal/controller/api_test.go b/internal/controller/api_test.go index e80a2d7d7133..6bd8bc322017 100644 --- a/internal/controller/api_test.go +++ b/internal/controller/api_test.go @@ -25,9 +25,19 @@ func TestController_API(t *testing.T) { rec := newTestReconciler() client := svctest.RunResourceService(t, demo.RegisterTypes) + concertsChan := make(chan controller.Event) + concertSource := &controller.Source{Source: concertsChan} + concertMapper := func(ctx context.Context, rt controller.Runtime, event controller.Event) ([]controller.Request, error) { + artistID := event.Obj.(*Concert).artistID + var requests []controller.Request + requests = append(requests, controller.Request{ID: artistID}) + return requests, nil + } + ctrl := controller. ForType(demo.TypeV2Artist). WithWatch(demo.TypeV2Album, controller.MapOwner). + WithCustomWatch(concertSource, concertMapper). WithBackoff(10*time.Millisecond, 100*time.Millisecond). WithReconciler(rec) @@ -69,6 +79,32 @@ func TestController_API(t *testing.T) { prototest.AssertDeepEqual(t, rsp.Resource.Id, req.ID) }) + t.Run("custom watched resource type", func(t *testing.T) { + res, err := demo.GenerateV2Artist() + require.NoError(t, err) + + rsp, err := client.Write(testContext(t), &pbresource.WriteRequest{Resource: res}) + require.NoError(t, err) + + req := rec.wait(t) + prototest.AssertDeepEqual(t, rsp.Resource.Id, req.ID) + + rec.expectNoRequest(t, 500*time.Millisecond) + + concertsChan <- controller.Event{Obj: &Concert{name: "test-concert", artistID: rsp.Resource.Id}} + + watchedReq := rec.wait(t) + prototest.AssertDeepEqual(t, req.ID, watchedReq.ID) + + otherArtist, err := demo.GenerateV2Artist() + require.NoError(t, err) + + concertsChan <- controller.Event{Obj: &Concert{name: "test-concert", artistID: otherArtist.Id}} + + watchedReq = rec.wait(t) + prototest.AssertDeepEqual(t, otherArtist.Id, watchedReq.ID) + }) + t.Run("error retries", func(t *testing.T) { rec.failNext(errors.New("KABOOM")) @@ -266,3 +302,12 @@ func testContext(t *testing.T) context.Context { return ctx } + +type Concert struct { + name string + artistID *pbresource.ID +} + +func (c Concert) Key() string { + return c.name +} diff --git a/internal/controller/controller.go b/internal/controller/controller.go index 54d5c57386a3..5d25ca63d926 100644 --- a/internal/controller/controller.go +++ b/internal/controller/controller.go @@ -41,7 +41,6 @@ func (c *controllerRunner) run(ctx context.Context) error { }) for _, watch := range c.ctrl.watches { - watch := watch mapQueue := runQueue[mapperRequest](groupCtx, c.ctrl) // Watched Type Events → Mapper Queue @@ -57,6 +56,22 @@ func (c *controllerRunner) run(ctx context.Context) error { }) } + for _, customWatch := range c.ctrl.customWatches { + customMapQueue := runQueue[Event](groupCtx, c.ctrl) + + // Custom Events → Mapper Queue + group.Go(func() error { + return customWatch.source.Watch(groupCtx, func(e Event) { + customMapQueue.Add(e) + }) + }) + + // Mapper Queue → Mapper → Reconciliation Queue + group.Go(func() error { + return c.runCustomMapper(groupCtx, customWatch, customMapQueue, recQueue) + }) + } + // Reconciliation Queue → Reconciler group.Go(func() error { return c.runReconciler(groupCtx, recQueue) @@ -136,6 +151,48 @@ func (c *controllerRunner) runMapper( } } +func (c *controllerRunner) runCustomMapper( + ctx context.Context, + w customWatch, + from queue.WorkQueue[Event], + to queue.WorkQueue[Request], +) error { + logger := c.logger.With("watched_event", w.source) + + for { + item, shutdown := from.Get() + if shutdown { + return nil + } + + var reqs []Request + err := c.handlePanic(func() error { + var err error + reqs, err = w.mapper(ctx, c.runtime(), item) + return err + }) + if err != nil { + from.AddRateLimited(item) + from.Done(item) + continue + } + + for _, r := range reqs { + if !resource.EqualType(r.ID.Type, c.ctrl.managedType) { + logger.Error("dependency mapper returned request for a resource of the wrong type", + "type_expected", resource.ToGVK(c.ctrl.managedType), + "type_got", resource.ToGVK(r.ID.Type), + ) + continue + } + to.Add(r) + } + + from.Forget(item) + from.Done(item) + } +} + func (c *controllerRunner) runReconciler(ctx context.Context, queue queue.WorkQueue[Request]) error { for { req, shutdown := queue.Get() diff --git a/internal/controller/dependency_mappers.go b/internal/controller/dependency_mappers.go index c054c4369e94..3e85738fa66c 100644 --- a/internal/controller/dependency_mappers.go +++ b/internal/controller/dependency_mappers.go @@ -15,6 +15,12 @@ type DependencyMapper func( res *pbresource.Resource, ) ([]Request, error) +type CustomDependencyMapper func( + ctx context.Context, + rt Runtime, + event Event, +) ([]Request, error) + // MapOwner implements a DependencyMapper that returns the updated resource's owner. func MapOwner(_ context.Context, _ Runtime, res *pbresource.Resource) ([]Request, error) { var reqs []Request