Skip to content

Commit

Permalink
Refactor kncloudevents Dispatcher to a class instead of functions to …
Browse files Browse the repository at this point in the history
…include the token handler later
  • Loading branch information
creydr committed Oct 5, 2023
1 parent 402f6ac commit edfef92
Show file tree
Hide file tree
Showing 16 changed files with 216 additions and 58 deletions.
5 changes: 4 additions & 1 deletion cmd/broker/filter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (

"knative.dev/eventing/cmd/broker"
"knative.dev/eventing/pkg/apis/feature"
"knative.dev/eventing/pkg/auth"
"knative.dev/eventing/pkg/broker/filter"
triggerinformer "knative.dev/eventing/pkg/client/injection/informers/eventing/v1/trigger"
"knative.dev/eventing/pkg/reconciler/names"
Expand All @@ -64,6 +65,7 @@ func main() {
metrics.MemStatsOrDie(ctx)

cfg := injection.ParseAndGetRESTConfigOrDie()
ctx = injection.WithConfig(ctx, cfg)

var env envConfig
if err := envconfig.Process("", &env); err != nil {
Expand Down Expand Up @@ -118,9 +120,10 @@ func main() {

reporter := filter.NewStatsReporter(env.ContainerName, kmeta.ChildName(env.PodName, uuid.New().String()))

oidcTokenProvider := auth.NewOIDCTokenProvider(ctx)
// We are running both the receiver (takes messages in from the Broker) and the dispatcher (send
// the messages to the triggers' subscribers) in this binary.
handler, err := filter.NewHandler(logger, triggerinformer.Get(ctx), reporter, ctxFunc)
handler, err := filter.NewHandler(logger, oidcTokenProvider, triggerinformer.Get(ctx), reporter, ctxFunc)
if err != nil {
logger.Fatal("Error creating Handler", zap.Error(err))
}
Expand Down
6 changes: 5 additions & 1 deletion cmd/broker/ingress/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (

cmdbroker "knative.dev/eventing/cmd/broker"
"knative.dev/eventing/pkg/apis/feature"
"knative.dev/eventing/pkg/auth"
broker "knative.dev/eventing/pkg/broker"
"knative.dev/eventing/pkg/broker/ingress"
eventingclient "knative.dev/eventing/pkg/client/injection/client"
Expand Down Expand Up @@ -82,6 +83,7 @@ func main() {
metrics.MemStatsOrDie(ctx)

cfg := injection.ParseAndGetRESTConfigOrDie()
ctx = injection.WithConfig(ctx, cfg)

var env envConfig
if err := envconfig.Process("", &env); err != nil {
Expand Down Expand Up @@ -150,7 +152,9 @@ func main() {

reporter := ingress.NewStatsReporter(env.ContainerName, kmeta.ChildName(env.PodName, uuid.New().String()))

handler, err = ingress.NewHandler(logger, reporter, broker.TTLDefaulter(logger, int32(env.MaxTTL)), brokerInformer)
oidcTokenProvider := auth.NewOIDCTokenProvider(ctx)

handler, err = ingress.NewHandler(logger, reporter, broker.TTLDefaulter(logger, int32(env.MaxTTL)), brokerInformer, oidcTokenProvider)
if err != nil {
logger.Fatal("Error creating Handler", zap.Error(err))
}
Expand Down
18 changes: 11 additions & 7 deletions pkg/broker/filter/filter_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"knative.dev/pkg/logging"

"knative.dev/eventing/pkg/apis"
"knative.dev/eventing/pkg/auth"
"knative.dev/eventing/pkg/utils"

eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
Expand Down Expand Up @@ -69,14 +70,16 @@ type Handler struct {
// reporter reports stats of status code and dispatch time
reporter StatsReporter

eventDispatcher *kncloudevents.Dispatcher

triggerLister eventinglisters.TriggerLister
logger *zap.Logger
withContext func(ctx context.Context) context.Context
filtersMap *subscriptionsapi.FiltersMap
}

// NewHandler creates a new Handler and its associated EventReceiver.
func NewHandler(logger *zap.Logger, triggerInformer v1.TriggerInformer, reporter StatsReporter, wc func(ctx context.Context) context.Context) (*Handler, error) {
func NewHandler(logger *zap.Logger, oidcTokenProvider *auth.OIDCTokenProvider, triggerInformer v1.TriggerInformer, reporter StatsReporter, wc func(ctx context.Context) context.Context) (*Handler, error) {
kncloudevents.ConfigureConnectionArgs(&kncloudevents.ConnectionArgs{
MaxIdleConns: defaultMaxIdleConnections,
MaxIdleConnsPerHost: defaultMaxIdleConnectionsPerHost,
Expand Down Expand Up @@ -124,11 +127,12 @@ func NewHandler(logger *zap.Logger, triggerInformer v1.TriggerInformer, reporter
})

return &Handler{
reporter: reporter,
triggerLister: triggerInformer.Lister(),
logger: logger,
withContext: wc,
filtersMap: fm,
reporter: reporter,
eventDispatcher: kncloudevents.NewDispatcher(oidcTokenProvider),
triggerLister: triggerInformer.Lister(),
logger: logger,
withContext: wc,
filtersMap: fm,
}, nil
}

Expand Down Expand Up @@ -238,7 +242,7 @@ func (h *Handler) send(ctx context.Context, writer http.ResponseWriter, headers
additionalHeaders := headers.Clone()
additionalHeaders.Set(apis.KnNamespaceHeader, t.GetNamespace())

dispatchInfo, err := kncloudevents.SendEvent(ctx, *event, target, kncloudevents.WithHeader(additionalHeaders))
dispatchInfo, err := h.eventDispatcher.SendEvent(ctx, *event, target, kncloudevents.WithHeader(additionalHeaders))
if err != nil {
h.logger.Error("failed to send event", zap.Error(err))

Expand Down
16 changes: 14 additions & 2 deletions pkg/broker/filter/filter_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,17 @@ import (
"k8s.io/apimachinery/pkg/types"
eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
"knative.dev/eventing/pkg/apis/feature"
"knative.dev/eventing/pkg/auth"
"knative.dev/eventing/pkg/broker"
"knative.dev/eventing/pkg/eventfilter/subscriptionsapi"
"knative.dev/pkg/apis"
"knative.dev/pkg/logging"
reconcilertesting "knative.dev/pkg/reconciler/testing"

triggerinformerfake "knative.dev/eventing/pkg/client/injection/informers/eventing/v1/trigger/fake"

// Fake injection client
_ "knative.dev/pkg/client/injection/kube/client/fake"
)

const (
Expand Down Expand Up @@ -425,6 +429,9 @@ func TestReceiver(t *testing.T) {
s := httptest.NewServer(&fh)
defer s.Close()

logger := zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller()))
oidcTokenProvider := auth.NewOIDCTokenProvider(ctx)

// Replace the SubscriberURI to point at our fake server.
for _, trig := range tc.triggers {
if trig.Status.SubscriberURI != nil && trig.Status.SubscriberURI.String() == toBeReplaced {
Expand All @@ -439,7 +446,8 @@ func TestReceiver(t *testing.T) {
}
reporter := &mockReporter{}
r, err := NewHandler(
zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller())),
logger,
oidcTokenProvider,
triggerinformerfake.Get(ctx),
reporter,
func(ctx context.Context) context.Context {
Expand Down Expand Up @@ -606,6 +614,9 @@ func TestReceiver_WithSubscriptionsAPI(t *testing.T) {

filtersMap := subscriptionsapi.NewFiltersMap()

logger := zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller()))
oidcTokenProvider := auth.NewOIDCTokenProvider(ctx)

// Replace the SubscriberURI to point at our fake server.
for _, trig := range tc.triggers {
if trig.Status.SubscriberURI != nil && trig.Status.SubscriberURI.String() == toBeReplaced {
Expand All @@ -621,7 +632,8 @@ func TestReceiver_WithSubscriptionsAPI(t *testing.T) {
}
reporter := &mockReporter{}
r, err := NewHandler(
zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller())),
logger,
oidcTokenProvider,
triggerinformerfake.Get(ctx),
reporter,
func(ctx context.Context) context.Context {
Expand Down
16 changes: 10 additions & 6 deletions pkg/broker/ingress/ingress_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (

"knative.dev/eventing/pkg/apis/eventing"
eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
"knative.dev/eventing/pkg/auth"
"knative.dev/eventing/pkg/broker"
v1 "knative.dev/eventing/pkg/client/informers/externalversions/eventing/v1"
eventinglisters "knative.dev/eventing/pkg/client/listers/eventing/v1"
Expand All @@ -64,9 +65,11 @@ type Handler struct {
EvenTypeHandler *eventtype.EventTypeAutoHandler

Logger *zap.Logger

eventDispatcher *kncloudevents.Dispatcher
}

func NewHandler(logger *zap.Logger, reporter StatsReporter, defaulter client.EventDefaulter, brokerInformer v1.BrokerInformer) (*Handler, error) {
func NewHandler(logger *zap.Logger, reporter StatsReporter, defaulter client.EventDefaulter, brokerInformer v1.BrokerInformer, oidcTokenProvider *auth.OIDCTokenProvider) (*Handler, error) {
connectionArgs := kncloudevents.ConnectionArgs{
MaxIdleConns: defaultMaxIdleConnections,
MaxIdleConnsPerHost: defaultMaxIdleConnectionsPerHost,
Expand Down Expand Up @@ -107,10 +110,11 @@ func NewHandler(logger *zap.Logger, reporter StatsReporter, defaulter client.Eve
})

return &Handler{
Defaulter: defaulter,
Reporter: reporter,
Logger: logger,
BrokerLister: brokerInformer.Lister(),
Defaulter: defaulter,
Reporter: reporter,
Logger: logger,
BrokerLister: brokerInformer.Lister(),
eventDispatcher: kncloudevents.NewDispatcher(oidcTokenProvider),
}, nil
}

Expand Down Expand Up @@ -282,7 +286,7 @@ func (h *Handler) receive(ctx context.Context, headers http.Header, event *cloud
return http.StatusBadRequest, kncloudevents.NoDuration
}

dispatchInfo, err := kncloudevents.SendEvent(ctx, *event, *channelAddress, kncloudevents.WithHeader(headers))
dispatchInfo, err := h.eventDispatcher.SendEvent(ctx, *event, *channelAddress, kncloudevents.WithHeader(headers))
if err != nil {
h.Logger.Error("failed to dispatch event", zap.Error(err))
return http.StatusInternalServerError, kncloudevents.NoDuration
Expand Down
8 changes: 7 additions & 1 deletion pkg/broker/ingress/ingress_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,14 @@ import (

"knative.dev/eventing/pkg/apis/eventing"
eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
"knative.dev/eventing/pkg/auth"
"knative.dev/eventing/pkg/broker"
reconcilertesting "knative.dev/pkg/reconciler/testing"

brokerinformerfake "knative.dev/eventing/pkg/client/injection/informers/eventing/v1/broker/fake"

// Fake injection client
_ "knative.dev/pkg/client/injection/kube/client/fake"
)

const (
Expand Down Expand Up @@ -281,7 +285,9 @@ func TestHandler_ServeHTTP(t *testing.T) {
brokerinformerfake.Get(ctx).Informer().GetStore().Add(b)
}

h, err := NewHandler(logger, &mockReporter{}, tc.defaulter, brokerinformerfake.Get(ctx))
oidcTokenProvider := auth.NewOIDCTokenProvider(ctx)

h, err := NewHandler(logger, &mockReporter{}, tc.defaulter, brokerinformerfake.Get(ctx), oidcTokenProvider)
if err != nil {
t.Fatal("Unable to create receiver:", err)
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/channel/fanout/fanout_event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ type FanoutEventHandler struct {

receiver *channel.EventReceiver

eventDispatcher *kncloudevents.Dispatcher

// TODO: Plumb context through the receiver and dispatcher and use that to store the timeout,
// rather than a member variable.
timeout time.Duration
Expand All @@ -100,6 +102,7 @@ func NewFanoutEventHandler(
eventTypeHandler *eventtype.EventTypeAutoHandler,
channelAddressable *duckv1.KReference,
channelUID *types.UID,
eventDispatcher *kncloudevents.Dispatcher,
receiverOpts ...channel.EventReceiverOptions,
) (*FanoutEventHandler, error) {
handler := &FanoutEventHandler{
Expand All @@ -110,6 +113,7 @@ func NewFanoutEventHandler(
eventTypeHandler: eventTypeHandler,
channelAddressable: channelAddressable,
channelUID: channelUID,
eventDispatcher: eventDispatcher,
}
handler.subscriptions = make([]Subscription, len(config.Subscriptions))
copy(handler.subscriptions, config.Subscriptions)
Expand Down Expand Up @@ -313,7 +317,7 @@ func (f *FanoutEventHandler) dispatch(ctx context.Context, subs []Subscription,
// makeFanoutRequest sends the request to exactly one subscription. It handles both the `call` and
// the `sink` portions of the subscription.
func (f *FanoutEventHandler) makeFanoutRequest(ctx context.Context, event event.Event, additionalHeaders nethttp.Header, sub Subscription) (*kncloudevents.DispatchInfo, error) {
return kncloudevents.SendEvent(ctx, event, sub.Subscriber,
return f.eventDispatcher.SendEvent(ctx, event, sub.Subscriber,
kncloudevents.WithHeader(additionalHeaders),
kncloudevents.WithReply(sub.Reply),
kncloudevents.WithDeadLetterSink(sub.DeadLetter),
Expand Down
15 changes: 14 additions & 1 deletion pkg/channel/fanout/fanout_event_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,13 @@ import (

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"k8s.io/client-go/rest"
"k8s.io/utils/pointer"
eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1"
"knative.dev/eventing/pkg/auth"
"knative.dev/eventing/pkg/kncloudevents"
duckv1 "knative.dev/pkg/apis/duck/v1"
"knative.dev/pkg/injection"

cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/cloudevents/sdk-go/v2/binding"
Expand All @@ -43,6 +46,8 @@ import (
"knative.dev/pkg/apis"

"knative.dev/eventing/pkg/channel"
fakekubeclient "knative.dev/pkg/client/injection/kube/client/fake"
_ "knative.dev/pkg/system/testing"
)

// Domains used in subscriptions, which will be replaced by the real domains of the started HTTP
Expand Down Expand Up @@ -318,6 +323,10 @@ func TestFanoutEventHandler_ServeHTTP(t *testing.T) {
}

func testFanoutEventHandler(t *testing.T, async bool, receiverFunc channel.EventReceiverFunc, timeout time.Duration, inSubs []Subscription, subscriberHandler func(http.ResponseWriter, *http.Request), subscriberReqs int, replierHandler func(http.ResponseWriter, *http.Request), replierReqs int, expectedStatus int) {
ctx := context.Background()
ctx, _ = fakekubeclient.With(ctx)
ctx = injection.WithConfig(ctx, &rest.Config{})

var subscriberServerWg *sync.WaitGroup
reporter := channel.NewStatsReporter("testcontainer", "testpod")
if subscriberReqs != 0 {
Expand Down Expand Up @@ -362,6 +371,9 @@ func testFanoutEventHandler(t *testing.T, async bool, receiverFunc channel.Event
t.Fatal(err)
}

oidcTokenProvider := auth.NewOIDCTokenProvider(ctx)
dispatcher := kncloudevents.NewDispatcher(oidcTokenProvider)

calledChan := make(chan bool, 1)
recvOptionFunc := func(*channel.EventReceiver) error {
calledChan <- true
Expand All @@ -378,6 +390,7 @@ func testFanoutEventHandler(t *testing.T, async bool, receiverFunc channel.Event
nil,
nil,
nil,
dispatcher,
recvOptionFunc,
)
<-calledChan
Expand All @@ -403,7 +416,7 @@ func testFanoutEventHandler(t *testing.T, async bool, receiverFunc channel.Event
reqCtx, _ := trace.StartSpan(context.TODO(), "bla")
req := httptest.NewRequest(http.MethodPost, "http://channelname.channelnamespace/", nil).WithContext(reqCtx)

ctx := context.Background()
ctx = context.Background()

if err := bindingshttp.WriteRequest(ctx, binding.ToMessage(&event), req); err != nil {
t.Fatal("WriteRequest =", err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (

"knative.dev/eventing/pkg/channel"
"knative.dev/eventing/pkg/channel/fanout"
"knative.dev/eventing/pkg/kncloudevents"
)

type MultiChannelEventHandler interface {
Expand Down Expand Up @@ -64,7 +65,7 @@ func NewEventHandler(_ context.Context, logger *zap.Logger) *EventHandler {

// NewEventHandlerWithConfig creates a new Handler with the specified configuration. This is really meant for tests
// where you want to apply a fully specified configuration for tests. Reconciler operates on single channel at a time.
func NewEventHandlerWithConfig(_ context.Context, logger *zap.Logger, conf Config, reporter channel.StatsReporter, recvOptions ...channel.EventReceiverOptions) (*EventHandler, error) {
func NewEventHandlerWithConfig(_ context.Context, logger *zap.Logger, conf Config, reporter channel.StatsReporter, eventDispatcher *kncloudevents.Dispatcher, recvOptions ...channel.EventReceiverOptions) (*EventHandler, error) {
handlers := make(map[string]fanout.EventHandler, len(conf.ChannelConfigs))

for _, cc := range conf.ChannelConfigs {
Expand All @@ -73,7 +74,7 @@ func NewEventHandlerWithConfig(_ context.Context, logger *zap.Logger, conf Confi
if key == "" {
continue
}
handler, err := fanout.NewFanoutEventHandler(logger, cc.FanoutConfig, reporter, cc.EventTypeHandler, cc.ChannelAddressable, cc.ChannelUID, recvOptions...)
handler, err := fanout.NewFanoutEventHandler(logger, cc.FanoutConfig, reporter, cc.EventTypeHandler, cc.ChannelAddressable, cc.ChannelUID, eventDispatcher, recvOptions...)
if err != nil {
logger.Error("Failed creating new fanout handler.", zap.Error(err))
return nil, err
Expand Down
Loading

0 comments on commit edfef92

Please sign in to comment.