From 4b1b1d57e00c6ef083d01a523d48762cd74987b4 Mon Sep 17 00:00:00 2001 From: Ashwin Venkatesh Date: Fri, 11 Aug 2023 16:24:19 -0400 Subject: [PATCH] refactor mapper methods --- internal/controller/controller.go | 89 +++++++++++++++---------------- 1 file changed, 43 insertions(+), 46 deletions(-) diff --git a/internal/controller/controller.go b/internal/controller/controller.go index 916de1fdc2773..b10665dcd1d14 100644 --- a/internal/controller/controller.go +++ b/internal/controller/controller.go @@ -40,35 +40,41 @@ func (c *controllerRunner) run(ctx context.Context) error { }) }) - for _, watch := range c.ctrl.watches { + for _, w := range c.ctrl.watches { mapQueue := runQueue[mapperRequest](groupCtx, c.ctrl) // Watched Type Events → Mapper Queue group.Go(func() error { - return c.watch(groupCtx, watch.watchedType, func(res *pbresource.Resource) { + return c.watch(groupCtx, w.watchedType, func(res *pbresource.Resource) { mapQueue.Add(mapperRequest{res: res}) }) }) // Mapper Queue → Mapper → Reconciliation Queue group.Go(func() error { - return c.runMapper(groupCtx, watch, mapQueue, recQueue) + watcher := w + return c.runMapper(groupCtx, watcher, mapQueue, recQueue, func(ctx context.Context, runtime Runtime, itemType queue.ItemType) ([]Request, error) { + return w.mapper(ctx, runtime, itemType.(mapperRequest).res) + }) }) } - for _, customWatch := range c.ctrl.customWatches { + for _, cw := range c.ctrl.customWatches { customMapQueue := runQueue[Event](groupCtx, c.ctrl) // Custom Events → Mapper Queue group.Go(func() error { - return customWatch.source.Watch(groupCtx, func(e Event) { + return cw.source.Watch(groupCtx, func(e Event) { customMapQueue.Add(e) }) }) // Mapper Queue → Mapper → Reconciliation Queue group.Go(func() error { - return c.runCustomMapper(groupCtx, customWatch, customMapQueue, recQueue) + watcher := cw + return c.runCustomMapper(groupCtx, watcher, customMapQueue, recQueue, func(ctx context.Context, runtime Runtime, itemType queue.ItemType) ([]Request, error) { + return cw.mapper(ctx, runtime, itemType.(Event)) + }) }) } @@ -86,7 +92,7 @@ func runQueue[T queue.ItemType](ctx context.Context, ctrl Controller) queue.Work } func (c *controllerRunner) watch(ctx context.Context, typ *pbresource.Type, add func(*pbresource.Resource)) error { - watch, err := c.client.WatchList(ctx, &pbresource.WatchListRequest{ + wl, err := c.client.WatchList(ctx, &pbresource.WatchListRequest{ Type: typ, Tenancy: &pbresource.Tenancy{ Partition: storage.Wildcard, @@ -100,7 +106,7 @@ func (c *controllerRunner) watch(ctx context.Context, typ *pbresource.Type, add } for { - event, err := watch.Recv() + event, err := wl.Recv() if err != nil { c.logger.Warn("error received from watch", "error", err) return err @@ -114,6 +120,7 @@ func (c *controllerRunner) runMapper( w watch, from queue.WorkQueue[mapperRequest], to queue.WorkQueue[Request], + mapper func(ctx context.Context, runtime Runtime, itemType queue.ItemType) ([]Request, error), ) error { logger := c.logger.With("watched_resource_type", resource.ToGVK(w.watchedType)) @@ -123,29 +130,12 @@ func (c *controllerRunner) runMapper( return nil } - var reqs []Request - err := c.handlePanic(func() error { - var err error - reqs, err = w.mapper(ctx, c.runtime(), item.res) - return err - }) - if err != nil { + if err := c.doMap(ctx, mapper, to, item, logger); err != nil { from.AddRateLimited(item) from.Done(item) continue } - for _, r := range reqs { - if !resource.EqualType(r.ID.Type, c.ctrl.managedType) { - logger.Error("dependency mapper returned request for a resource of the wrong type", - "type_expected", resource.ToGVK(c.ctrl.managedType), - "type_got", resource.ToGVK(r.ID.Type), - ) - continue - } - to.Add(r) - } - from.Forget(item) from.Done(item) } @@ -153,11 +143,12 @@ func (c *controllerRunner) runMapper( func (c *controllerRunner) runCustomMapper( ctx context.Context, - w customWatch, + cw customWatch, from queue.WorkQueue[Event], to queue.WorkQueue[Request], + mapper func(ctx context.Context, runtime Runtime, itemType queue.ItemType) ([]Request, error), ) error { - logger := c.logger.With("watched_event", w.source) + logger := c.logger.With("watched_event", cw.source) for { item, shutdown := from.Get() @@ -165,34 +156,40 @@ func (c *controllerRunner) runCustomMapper( return nil } - var reqs []Request - err := c.handlePanic(func() error { - var err error - reqs, err = w.mapper(ctx, c.runtime(), item) - return err - }) - if err != nil { + if err := c.doMap(ctx, mapper, to, item, logger); err != nil { from.AddRateLimited(item) from.Done(item) continue } - for _, r := range reqs { - if !resource.EqualType(r.ID.Type, c.ctrl.managedType) { - logger.Error("dependency mapper returned request for a resource of the wrong type", - "type_expected", resource.ToGVK(c.ctrl.managedType), - "type_got", resource.ToGVK(r.ID.Type), - ) - continue - } - to.Add(r) - } - from.Forget(item) from.Done(item) } } +func (c *controllerRunner) doMap(ctx context.Context, mapper func(ctx context.Context, runtime Runtime, itemType queue.ItemType) ([]Request, error), to queue.WorkQueue[Request], item queue.ItemType, logger hclog.Logger) error { + var reqs []Request + if err := c.handlePanic(func() error { + var err error + reqs, err = mapper(ctx, c.runtime(), item) + return err + }); err != nil { + return err + } + + for _, r := range reqs { + if !resource.EqualType(r.ID.Type, c.ctrl.managedType) { + logger.Error("dependency mapper returned request for a resource of the wrong type", + "type_expected", resource.ToGVK(c.ctrl.managedType), + "type_got", resource.ToGVK(r.ID.Type), + ) + continue + } + to.Add(r) + } + return nil +} + func (c *controllerRunner) runReconciler(ctx context.Context, queue queue.WorkQueue[Request]) error { for { req, shutdown := queue.Get()