Skip to content

Commit

Permalink
refactor mapper methods
Browse files Browse the repository at this point in the history
  • Loading branch information
thisisnotashwin committed Aug 14, 2023
1 parent 73d12a7 commit d4afb54
Showing 1 changed file with 43 additions and 46 deletions.
89 changes: 43 additions & 46 deletions internal/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,35 +40,41 @@ func (c *controllerRunner) run(ctx context.Context) error {
})
})

for _, watch := range c.ctrl.watches {
for _, w := range c.ctrl.watches {
mapQueue := runQueue[mapperRequest](groupCtx, c.ctrl)

// Watched Type Events → Mapper Queue
group.Go(func() error {
return c.watch(groupCtx, watch.watchedType, func(res *pbresource.Resource) {
return c.watch(groupCtx, w.watchedType, func(res *pbresource.Resource) {
mapQueue.Add(mapperRequest{res: res})
})
})

// Mapper Queue → Mapper → Reconciliation Queue
watcher := w
group.Go(func() error {
return c.runMapper(groupCtx, watch, mapQueue, recQueue)
return c.runMapper(groupCtx, watcher, mapQueue, recQueue, func(ctx context.Context, runtime Runtime, itemType queue.ItemType) ([]Request, error) {
return watcher.mapper(ctx, runtime, itemType.(mapperRequest).res)
})
})
}

for _, customWatch := range c.ctrl.customWatches {
for _, cw := range c.ctrl.customWatches {
customMapQueue := runQueue[Event](groupCtx, c.ctrl)

// Custom Events → Mapper Queue
group.Go(func() error {
return customWatch.source.Watch(groupCtx, func(e Event) {
return cw.source.Watch(groupCtx, func(e Event) {
customMapQueue.Add(e)
})
})

// Mapper Queue → Mapper → Reconciliation Queue
watcher := cw
group.Go(func() error {
return c.runCustomMapper(groupCtx, customWatch, customMapQueue, recQueue)
return c.runCustomMapper(groupCtx, watcher, customMapQueue, recQueue, func(ctx context.Context, runtime Runtime, itemType queue.ItemType) ([]Request, error) {
return watcher.mapper(ctx, runtime, itemType.(Event))
})
})
}

Expand All @@ -86,7 +92,7 @@ func runQueue[T queue.ItemType](ctx context.Context, ctrl Controller) queue.Work
}

func (c *controllerRunner) watch(ctx context.Context, typ *pbresource.Type, add func(*pbresource.Resource)) error {
watch, err := c.client.WatchList(ctx, &pbresource.WatchListRequest{
wl, err := c.client.WatchList(ctx, &pbresource.WatchListRequest{
Type: typ,
Tenancy: &pbresource.Tenancy{
Partition: storage.Wildcard,
Expand All @@ -100,7 +106,7 @@ func (c *controllerRunner) watch(ctx context.Context, typ *pbresource.Type, add
}

for {
event, err := watch.Recv()
event, err := wl.Recv()
if err != nil {
c.logger.Warn("error received from watch", "error", err)
return err
Expand All @@ -114,6 +120,7 @@ func (c *controllerRunner) runMapper(
w watch,
from queue.WorkQueue[mapperRequest],
to queue.WorkQueue[Request],
mapper func(ctx context.Context, runtime Runtime, itemType queue.ItemType) ([]Request, error),
) error {
logger := c.logger.With("watched_resource_type", resource.ToGVK(w.watchedType))

Expand All @@ -123,76 +130,66 @@ func (c *controllerRunner) runMapper(
return nil
}

var reqs []Request
err := c.handlePanic(func() error {
var err error
reqs, err = w.mapper(ctx, c.runtime(), item.res)
return err
})
if err != nil {
if err := c.doMap(ctx, mapper, to, item, logger); err != nil {
from.AddRateLimited(item)
from.Done(item)
continue
}

for _, r := range reqs {
if !resource.EqualType(r.ID.Type, c.ctrl.managedType) {
logger.Error("dependency mapper returned request for a resource of the wrong type",
"type_expected", resource.ToGVK(c.ctrl.managedType),
"type_got", resource.ToGVK(r.ID.Type),
)
continue
}
to.Add(r)
}

from.Forget(item)
from.Done(item)
}
}

func (c *controllerRunner) runCustomMapper(
ctx context.Context,
w customWatch,
cw customWatch,
from queue.WorkQueue[Event],
to queue.WorkQueue[Request],
mapper func(ctx context.Context, runtime Runtime, itemType queue.ItemType) ([]Request, error),
) error {
logger := c.logger.With("watched_event", w.source)
logger := c.logger.With("watched_event", cw.source)

for {
item, shutdown := from.Get()
if shutdown {
return nil
}

var reqs []Request
err := c.handlePanic(func() error {
var err error
reqs, err = w.mapper(ctx, c.runtime(), item)
return err
})
if err != nil {
if err := c.doMap(ctx, mapper, to, item, logger); err != nil {
from.AddRateLimited(item)
from.Done(item)
continue
}

for _, r := range reqs {
if !resource.EqualType(r.ID.Type, c.ctrl.managedType) {
logger.Error("dependency mapper returned request for a resource of the wrong type",
"type_expected", resource.ToGVK(c.ctrl.managedType),
"type_got", resource.ToGVK(r.ID.Type),
)
continue
}
to.Add(r)
}

from.Forget(item)
from.Done(item)
}
}

func (c *controllerRunner) doMap(ctx context.Context, mapper func(ctx context.Context, runtime Runtime, itemType queue.ItemType) ([]Request, error), to queue.WorkQueue[Request], item queue.ItemType, logger hclog.Logger) error {
var reqs []Request
if err := c.handlePanic(func() error {
var err error
reqs, err = mapper(ctx, c.runtime(), item)
return err
}); err != nil {
return err
}

for _, r := range reqs {
if !resource.EqualType(r.ID.Type, c.ctrl.managedType) {
logger.Error("dependency mapper returned request for a resource of the wrong type",
"type_expected", resource.ToGVK(c.ctrl.managedType),
"type_got", resource.ToGVK(r.ID.Type),
)
continue
}
to.Add(r)
}
return nil
}

func (c *controllerRunner) runReconciler(ctx context.Context, queue queue.WorkQueue[Request]) error {
for {
req, shutdown := queue.Get()
Expand Down

0 comments on commit d4afb54

Please sign in to comment.