Skip to content

Commit

Permalink
refactor mapper methods
Browse files Browse the repository at this point in the history
  • Loading branch information
thisisnotashwin committed Aug 15, 2023
1 parent faefe8b commit b1cd8ad
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 49 deletions.
9 changes: 6 additions & 3 deletions internal/controller/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,8 @@ func (c Controller) WithWatch(watchedType *pbresource.Type, mapper DependencyMap
return c
}

// WithCustomWatch adds a custom watch on the given type/dependency to the controller. custom mapper
// will be called to determine which resources must be reconciled as a result of
// an event.
// WithCustomWatch adds a custom watch on the given dependency to the controller. Custom mapper
// will be called to map events produced by source to the controller's watched type.
func (c Controller) WithCustomWatch(source *Source, mapper CustomDependencyMapper) Controller {
if source == nil {
panic("source must not be nil")
Expand Down Expand Up @@ -158,14 +157,18 @@ type Source struct {
Source <-chan Event
}

// Event captures an event in the system which the API can choose to respond to.
type Event struct {
Obj queue.ItemType
}

// Key returns a string that will be used to de-duplicate items in the queue.
func (e Event) Key() string {
return e.Obj.Key()
}

// customWatch represent a Watch on a custom Event source and a Mapper to map said
// Events into Requests that the controller can respond to.
type customWatch struct {
source *Source
mapper CustomDependencyMapper
Expand Down
1 change: 1 addition & 0 deletions internal/controller/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ func TestController_API(t *testing.T) {
client := svctest.RunResourceService(t, demo.RegisterTypes)

concertsChan := make(chan controller.Event)
defer close(concertsChan)
concertSource := &controller.Source{Source: concertsChan}
concertMapper := func(ctx context.Context, rt controller.Runtime, event controller.Event) ([]controller.Request, error) {
artistID := event.Obj.(*Concert).artistID
Expand Down
89 changes: 43 additions & 46 deletions internal/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,35 +40,41 @@ func (c *controllerRunner) run(ctx context.Context) error {
})
})

for _, watch := range c.ctrl.watches {
for _, w := range c.ctrl.watches {
mapQueue := runQueue[mapperRequest](groupCtx, c.ctrl)

// Watched Type Events → Mapper Queue
group.Go(func() error {
return c.watch(groupCtx, watch.watchedType, func(res *pbresource.Resource) {
return c.watch(groupCtx, w.watchedType, func(res *pbresource.Resource) {
mapQueue.Add(mapperRequest{res: res})
})
})

// Mapper Queue → Mapper → Reconciliation Queue
watcher := w
group.Go(func() error {
return c.runMapper(groupCtx, watch, mapQueue, recQueue)
return c.runMapper(groupCtx, watcher, mapQueue, recQueue, func(ctx context.Context, runtime Runtime, itemType queue.ItemType) ([]Request, error) {
return watcher.mapper(ctx, runtime, itemType.(mapperRequest).res)
})
})
}

for _, customWatch := range c.ctrl.customWatches {
for _, cw := range c.ctrl.customWatches {
customMapQueue := runQueue[Event](groupCtx, c.ctrl)

// Custom Events → Mapper Queue
group.Go(func() error {
return customWatch.source.Watch(groupCtx, func(e Event) {
return cw.source.Watch(groupCtx, func(e Event) {
customMapQueue.Add(e)
})
})

// Mapper Queue → Mapper → Reconciliation Queue
watcher := cw
group.Go(func() error {
return c.runCustomMapper(groupCtx, customWatch, customMapQueue, recQueue)
return c.runCustomMapper(groupCtx, watcher, customMapQueue, recQueue, func(ctx context.Context, runtime Runtime, itemType queue.ItemType) ([]Request, error) {
return watcher.mapper(ctx, runtime, itemType.(Event))
})
})
}

Expand All @@ -86,7 +92,7 @@ func runQueue[T queue.ItemType](ctx context.Context, ctrl Controller) queue.Work
}

func (c *controllerRunner) watch(ctx context.Context, typ *pbresource.Type, add func(*pbresource.Resource)) error {
watch, err := c.client.WatchList(ctx, &pbresource.WatchListRequest{
wl, err := c.client.WatchList(ctx, &pbresource.WatchListRequest{
Type: typ,
Tenancy: &pbresource.Tenancy{
Partition: storage.Wildcard,
Expand All @@ -100,7 +106,7 @@ func (c *controllerRunner) watch(ctx context.Context, typ *pbresource.Type, add
}

for {
event, err := watch.Recv()
event, err := wl.Recv()
if err != nil {
c.logger.Warn("error received from watch", "error", err)
return err
Expand All @@ -114,6 +120,7 @@ func (c *controllerRunner) runMapper(
w watch,
from queue.WorkQueue[mapperRequest],
to queue.WorkQueue[Request],
mapper func(ctx context.Context, runtime Runtime, itemType queue.ItemType) ([]Request, error),
) error {
logger := c.logger.With("watched_resource_type", resource.ToGVK(w.watchedType))

Expand All @@ -123,76 +130,66 @@ func (c *controllerRunner) runMapper(
return nil
}

var reqs []Request
err := c.handlePanic(func() error {
var err error
reqs, err = w.mapper(ctx, c.runtime(), item.res)
return err
})
if err != nil {
if err := c.doMap(ctx, mapper, to, item, logger); err != nil {
from.AddRateLimited(item)
from.Done(item)
continue
}

for _, r := range reqs {
if !resource.EqualType(r.ID.Type, c.ctrl.managedType) {
logger.Error("dependency mapper returned request for a resource of the wrong type",
"type_expected", resource.ToGVK(c.ctrl.managedType),
"type_got", resource.ToGVK(r.ID.Type),
)
continue
}
to.Add(r)
}

from.Forget(item)
from.Done(item)
}
}

func (c *controllerRunner) runCustomMapper(
ctx context.Context,
w customWatch,
cw customWatch,
from queue.WorkQueue[Event],
to queue.WorkQueue[Request],
mapper func(ctx context.Context, runtime Runtime, itemType queue.ItemType) ([]Request, error),
) error {
logger := c.logger.With("watched_event", w.source)
logger := c.logger.With("watched_event", cw.source)

for {
item, shutdown := from.Get()
if shutdown {
return nil
}

var reqs []Request
err := c.handlePanic(func() error {
var err error
reqs, err = w.mapper(ctx, c.runtime(), item)
return err
})
if err != nil {
if err := c.doMap(ctx, mapper, to, item, logger); err != nil {
from.AddRateLimited(item)
from.Done(item)
continue
}

for _, r := range reqs {
if !resource.EqualType(r.ID.Type, c.ctrl.managedType) {
logger.Error("dependency mapper returned request for a resource of the wrong type",
"type_expected", resource.ToGVK(c.ctrl.managedType),
"type_got", resource.ToGVK(r.ID.Type),
)
continue
}
to.Add(r)
}

from.Forget(item)
from.Done(item)
}
}

func (c *controllerRunner) doMap(ctx context.Context, mapper func(ctx context.Context, runtime Runtime, itemType queue.ItemType) ([]Request, error), to queue.WorkQueue[Request], item queue.ItemType, logger hclog.Logger) error {
var reqs []Request
if err := c.handlePanic(func() error {
var err error
reqs, err = mapper(ctx, c.runtime(), item)
return err
}); err != nil {
return err
}

for _, r := range reqs {
if !resource.EqualType(r.ID.Type, c.ctrl.managedType) {
logger.Error("dependency mapper returned request for a resource of the wrong type",
"type_expected", resource.ToGVK(c.ctrl.managedType),
"type_got", resource.ToGVK(r.ID.Type),
)
continue
}
to.Add(r)
}
return nil
}

func (c *controllerRunner) runReconciler(ctx context.Context, queue queue.WorkQueue[Request]) error {
for {
req, shutdown := queue.Get()
Expand Down
2 changes: 2 additions & 0 deletions internal/controller/dependency_mappers.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ type DependencyMapper func(
res *pbresource.Resource,
) ([]Request, error)

// CustomDependencyMapper is called when an Event occurs to determine which of the
// controller's managed resources need to be reconciled.
type CustomDependencyMapper func(
ctx context.Context,
rt Runtime,
Expand Down

0 comments on commit b1cd8ad

Please sign in to comment.