Skip to content

Commit

Permalink
Refactor kncloudevents Dispatcher to a class instead of functions to …
Browse files Browse the repository at this point in the history
…include the token handler later

Fix KinD setup with valid service account issuer

Add github.com/coreos/oidc module

Add basic library for OIDC token handling

Update kncloudevents.Dispatcher to include OIDCTokenHandler

Reuse logger

Simplify tokenHandler by loading provider config during initialization

Support external issuer case

Remove discovery RestClient usage

Renaming

Pass context to initProvider

Get everything from context

Split into Provider and Verifier
  • Loading branch information
creydr committed Oct 2, 2023
1 parent 243c52e commit f52e92a
Show file tree
Hide file tree
Showing 17 changed files with 245 additions and 59 deletions.
5 changes: 4 additions & 1 deletion cmd/broker/filter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (

"knative.dev/eventing/cmd/broker"
"knative.dev/eventing/pkg/apis/feature"
"knative.dev/eventing/pkg/auth"
"knative.dev/eventing/pkg/broker/filter"
triggerinformer "knative.dev/eventing/pkg/client/injection/informers/eventing/v1/trigger"
"knative.dev/eventing/pkg/reconciler/names"
Expand All @@ -64,6 +65,7 @@ func main() {
metrics.MemStatsOrDie(ctx)

cfg := injection.ParseAndGetRESTConfigOrDie()
ctx = injection.WithConfig(ctx, cfg)

var env envConfig
if err := envconfig.Process("", &env); err != nil {
Expand Down Expand Up @@ -118,9 +120,10 @@ func main() {

reporter := filter.NewStatsReporter(env.ContainerName, kmeta.ChildName(env.PodName, uuid.New().String()))

oidcTokenProvider := auth.NewOIDCTokenProvider(ctx)
// We are running both the receiver (takes messages in from the Broker) and the dispatcher (send
// the messages to the triggers' subscribers) in this binary.
handler, err := filter.NewHandler(logger, triggerinformer.Get(ctx), reporter, ctxFunc)
handler, err := filter.NewHandler(logger, oidcTokenProvider, triggerinformer.Get(ctx), reporter, ctxFunc)
if err != nil {
logger.Fatal("Error creating Handler", zap.Error(err))
}
Expand Down
6 changes: 5 additions & 1 deletion cmd/broker/ingress/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (

cmdbroker "knative.dev/eventing/cmd/broker"
"knative.dev/eventing/pkg/apis/feature"
"knative.dev/eventing/pkg/auth"
broker "knative.dev/eventing/pkg/broker"
"knative.dev/eventing/pkg/broker/ingress"
eventingclient "knative.dev/eventing/pkg/client/injection/client"
Expand Down Expand Up @@ -82,6 +83,7 @@ func main() {
metrics.MemStatsOrDie(ctx)

cfg := injection.ParseAndGetRESTConfigOrDie()
ctx = injection.WithConfig(ctx, cfg)

var env envConfig
if err := envconfig.Process("", &env); err != nil {
Expand Down Expand Up @@ -150,7 +152,9 @@ func main() {

reporter := ingress.NewStatsReporter(env.ContainerName, kmeta.ChildName(env.PodName, uuid.New().String()))

handler, err = ingress.NewHandler(logger, reporter, broker.TTLDefaulter(logger, int32(env.MaxTTL)), brokerInformer)
oidcTokenProvider := auth.NewOIDCTokenProvider(ctx)

handler, err = ingress.NewHandler(logger, reporter, broker.TTLDefaulter(logger, int32(env.MaxTTL)), brokerInformer, oidcTokenProvider)
if err != nil {
logger.Fatal("Error creating Handler", zap.Error(err))
}
Expand Down
22 changes: 15 additions & 7 deletions pkg/broker/filter/filter_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"knative.dev/pkg/logging"

"knative.dev/eventing/pkg/apis"
"knative.dev/eventing/pkg/auth"
"knative.dev/eventing/pkg/utils"

eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
Expand Down Expand Up @@ -69,14 +70,16 @@ type Handler struct {
// reporter reports stats of status code and dispatch time
reporter StatsReporter

eventDispatcher *kncloudevents.Dispatcher

triggerLister eventinglisters.TriggerLister
logger *zap.Logger
withContext func(ctx context.Context) context.Context
filtersMap *subscriptionsapi.FiltersMap
}

// NewHandler creates a new Handler and its associated EventReceiver.
func NewHandler(logger *zap.Logger, triggerInformer v1.TriggerInformer, reporter StatsReporter, wc func(ctx context.Context) context.Context) (*Handler, error) {
func NewHandler(logger *zap.Logger, oidcTokenProvider *auth.OIDCTokenProvider, triggerInformer v1.TriggerInformer, reporter StatsReporter, wc func(ctx context.Context) context.Context) (*Handler, error) {
kncloudevents.ConfigureConnectionArgs(&kncloudevents.ConnectionArgs{
MaxIdleConns: defaultMaxIdleConnections,
MaxIdleConnsPerHost: defaultMaxIdleConnectionsPerHost,
Expand Down Expand Up @@ -124,11 +127,12 @@ func NewHandler(logger *zap.Logger, triggerInformer v1.TriggerInformer, reporter
})

return &Handler{
reporter: reporter,
triggerLister: triggerInformer.Lister(),
logger: logger,
withContext: wc,
filtersMap: fm,
reporter: reporter,
eventDispatcher: kncloudevents.NewDispatcher(oidcTokenProvider),
triggerLister: triggerInformer.Lister(),
logger: logger,
withContext: wc,
filtersMap: fm,
}, nil
}

Expand Down Expand Up @@ -214,6 +218,10 @@ func (h *Handler) ServeHTTP(writer http.ResponseWriter, request *http.Request) {
return
}

// get audience of triggers subscriber from t.Status.SubscriberAudience somehow (new field)

// get triggers (?!?) .status.auth.serviceAccountName

// Check if the event should be sent.
ctx = logging.WithLogger(ctx, h.logger.Sugar().With(zap.String("trigger", fmt.Sprintf("%s/%s", t.GetNamespace(), t.GetName()))))
filterResult := h.filterEvent(ctx, t, *event)
Expand All @@ -238,7 +246,7 @@ func (h *Handler) send(ctx context.Context, writer http.ResponseWriter, headers
additionalHeaders := headers.Clone()
additionalHeaders.Set(apis.KnNamespaceHeader, t.GetNamespace())

dispatchInfo, err := kncloudevents.SendEvent(ctx, *event, target, kncloudevents.WithHeader(additionalHeaders))
dispatchInfo, err := h.eventDispatcher.SendEvent(ctx, *event, target, kncloudevents.WithHeader(additionalHeaders))
if err != nil {
h.logger.Error("failed to send event", zap.Error(err))

Expand Down
16 changes: 14 additions & 2 deletions pkg/broker/filter/filter_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,17 @@ import (
"k8s.io/apimachinery/pkg/types"
eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
"knative.dev/eventing/pkg/apis/feature"
"knative.dev/eventing/pkg/auth"
"knative.dev/eventing/pkg/broker"
"knative.dev/eventing/pkg/eventfilter/subscriptionsapi"
"knative.dev/pkg/apis"
"knative.dev/pkg/logging"
reconcilertesting "knative.dev/pkg/reconciler/testing"

triggerinformerfake "knative.dev/eventing/pkg/client/injection/informers/eventing/v1/trigger/fake"

// Fake injection client
_ "knative.dev/pkg/client/injection/kube/client/fake"
)

const (
Expand Down Expand Up @@ -425,6 +429,9 @@ func TestReceiver(t *testing.T) {
s := httptest.NewServer(&fh)
defer s.Close()

logger := zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller()))
oidcTokenProvider := auth.NewOIDCTokenProvider(ctx)

// Replace the SubscriberURI to point at our fake server.
for _, trig := range tc.triggers {
if trig.Status.SubscriberURI != nil && trig.Status.SubscriberURI.String() == toBeReplaced {
Expand All @@ -439,7 +446,8 @@ func TestReceiver(t *testing.T) {
}
reporter := &mockReporter{}
r, err := NewHandler(
zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller())),
logger,
oidcTokenProvider,
triggerinformerfake.Get(ctx),
reporter,
func(ctx context.Context) context.Context {
Expand Down Expand Up @@ -606,6 +614,9 @@ func TestReceiver_WithSubscriptionsAPI(t *testing.T) {

filtersMap := subscriptionsapi.NewFiltersMap()

logger := zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller()))
oidcTokenProvider := auth.NewOIDCTokenProvider(ctx)

// Replace the SubscriberURI to point at our fake server.
for _, trig := range tc.triggers {
if trig.Status.SubscriberURI != nil && trig.Status.SubscriberURI.String() == toBeReplaced {
Expand All @@ -621,7 +632,8 @@ func TestReceiver_WithSubscriptionsAPI(t *testing.T) {
}
reporter := &mockReporter{}
r, err := NewHandler(
zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller())),
logger,
oidcTokenProvider,
triggerinformerfake.Get(ctx),
reporter,
func(ctx context.Context) context.Context {
Expand Down
40 changes: 34 additions & 6 deletions pkg/broker/ingress/ingress_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (

"knative.dev/eventing/pkg/apis/eventing"
eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
"knative.dev/eventing/pkg/auth"
"knative.dev/eventing/pkg/broker"
v1 "knative.dev/eventing/pkg/client/informers/externalversions/eventing/v1"
eventinglisters "knative.dev/eventing/pkg/client/listers/eventing/v1"
Expand All @@ -64,9 +65,13 @@ type Handler struct {
EvenTypeHandler *eventtype.EventTypeAutoHandler

Logger *zap.Logger

oidcTokenProvider *auth.OIDCTokenProvider

eventDispatcher *kncloudevents.Dispatcher
}

func NewHandler(logger *zap.Logger, reporter StatsReporter, defaulter client.EventDefaulter, brokerInformer v1.BrokerInformer) (*Handler, error) {
func NewHandler(logger *zap.Logger, reporter StatsReporter, defaulter client.EventDefaulter, brokerInformer v1.BrokerInformer, oidcTokenProvider *auth.OIDCTokenProvider) (*Handler, error) {
connectionArgs := kncloudevents.ConnectionArgs{
MaxIdleConns: defaultMaxIdleConnections,
MaxIdleConnsPerHost: defaultMaxIdleConnectionsPerHost,
Expand Down Expand Up @@ -107,10 +112,12 @@ func NewHandler(logger *zap.Logger, reporter StatsReporter, defaulter client.Eve
})

return &Handler{
Defaulter: defaulter,
Reporter: reporter,
Logger: logger,
BrokerLister: brokerInformer.Lister(),
Defaulter: defaulter,
Reporter: reporter,
Logger: logger,
BrokerLister: brokerInformer.Lister(),
oidcTokenProvider: oidcTokenProvider,
eventDispatcher: kncloudevents.NewDispatcher(oidcTokenProvider),
}, nil
}

Expand Down Expand Up @@ -182,6 +189,27 @@ func (h *Handler) ServeHTTP(writer http.ResponseWriter, request *http.Request) {

ctx := request.Context()

// jwt, err := h.oidcTokenHandler.GetJWT(types.NamespacedName{
// Name: "mt-broker-ingress",
// Namespace: "knative-eventing",
// }, "my-audience")
// if err != nil {
// h.Logger.Error("could not get token", zap.Error(err))
// writer.WriteHeader(http.StatusInternalServerError)
// return
// }

// h.Logger.Info("got a JWT", zap.String("jwt", jwt))

// idToken, err := h.oidcTokenHandler.VerifyJWT(ctx, jwt, "my-audience")
// if err != nil {
// h.Logger.Error("could not verify token", zap.Error(err))
// writer.WriteHeader(http.StatusInternalServerError)
// return
// }

// h.Logger.Info("verified token", zap.Any("idtoken", idToken))

message := cehttp.NewMessageFromHttpRequest(request)
defer message.Finish(nil)

Expand Down Expand Up @@ -282,7 +310,7 @@ func (h *Handler) receive(ctx context.Context, headers http.Header, event *cloud
return http.StatusBadRequest, kncloudevents.NoDuration
}

dispatchInfo, err := kncloudevents.SendEvent(ctx, *event, *channelAddress, kncloudevents.WithHeader(headers))
dispatchInfo, err := h.eventDispatcher.SendEvent(ctx, *event, *channelAddress, kncloudevents.WithHeader(headers))
if err != nil {
h.Logger.Error("failed to dispatch event", zap.Error(err))
return http.StatusInternalServerError, kncloudevents.NoDuration
Expand Down
8 changes: 7 additions & 1 deletion pkg/broker/ingress/ingress_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,14 @@ import (

"knative.dev/eventing/pkg/apis/eventing"
eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
"knative.dev/eventing/pkg/auth"
"knative.dev/eventing/pkg/broker"
reconcilertesting "knative.dev/pkg/reconciler/testing"

brokerinformerfake "knative.dev/eventing/pkg/client/injection/informers/eventing/v1/broker/fake"

// Fake injection client
_ "knative.dev/pkg/client/injection/kube/client/fake"
)

const (
Expand Down Expand Up @@ -281,7 +285,9 @@ func TestHandler_ServeHTTP(t *testing.T) {
brokerinformerfake.Get(ctx).Informer().GetStore().Add(b)
}

h, err := NewHandler(logger, &mockReporter{}, tc.defaulter, brokerinformerfake.Get(ctx))
oidcTokenProvider := auth.NewOIDCTokenProvider(ctx)

h, err := NewHandler(logger, &mockReporter{}, tc.defaulter, brokerinformerfake.Get(ctx), oidcTokenProvider)
if err != nil {
t.Fatal("Unable to create receiver:", err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/channel/event_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func (r *EventReceiver) ServeHTTP(response nethttp.ResponseWriter, request *neth
ReportEventCountMetricsForDispatchError(err, r.reporter, &args)
return
}
r.logger.Debug("Request mapped to channel", zap.String("channel", channel.String()))
r.logger.Info("Request mapped to channel", zap.String("channel", channel.String()))

args.Ns = channel.Namespace

Expand Down
6 changes: 5 additions & 1 deletion pkg/channel/fanout/fanout_event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ type FanoutEventHandler struct {

receiver *channel.EventReceiver

eventDispatcher *kncloudevents.Dispatcher

// TODO: Plumb context through the receiver and dispatcher and use that to store the timeout,
// rather than a member variable.
timeout time.Duration
Expand All @@ -100,6 +102,7 @@ func NewFanoutEventHandler(
eventTypeHandler *eventtype.EventTypeAutoHandler,
channelAddressable *duckv1.KReference,
channelUID *types.UID,
eventDispatcher *kncloudevents.Dispatcher,
receiverOpts ...channel.EventReceiverOptions,
) (*FanoutEventHandler, error) {
handler := &FanoutEventHandler{
Expand All @@ -110,6 +113,7 @@ func NewFanoutEventHandler(
eventTypeHandler: eventTypeHandler,
channelAddressable: channelAddressable,
channelUID: channelUID,
eventDispatcher: eventDispatcher,
}
handler.subscriptions = make([]Subscription, len(config.Subscriptions))
copy(handler.subscriptions, config.Subscriptions)
Expand Down Expand Up @@ -313,7 +317,7 @@ func (f *FanoutEventHandler) dispatch(ctx context.Context, subs []Subscription,
// makeFanoutRequest sends the request to exactly one subscription. It handles both the `call` and
// the `sink` portions of the subscription.
func (f *FanoutEventHandler) makeFanoutRequest(ctx context.Context, event event.Event, additionalHeaders nethttp.Header, sub Subscription) (*kncloudevents.DispatchInfo, error) {
return kncloudevents.SendEvent(ctx, event, sub.Subscriber,
return f.eventDispatcher.SendEvent(ctx, event, sub.Subscriber,
kncloudevents.WithHeader(additionalHeaders),
kncloudevents.WithReply(sub.Reply),
kncloudevents.WithDeadLetterSink(sub.DeadLetter),
Expand Down
15 changes: 14 additions & 1 deletion pkg/channel/fanout/fanout_event_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,13 @@ import (

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"k8s.io/client-go/rest"
"k8s.io/utils/pointer"
eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1"
"knative.dev/eventing/pkg/auth"
"knative.dev/eventing/pkg/kncloudevents"
duckv1 "knative.dev/pkg/apis/duck/v1"
"knative.dev/pkg/injection"

cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/cloudevents/sdk-go/v2/binding"
Expand All @@ -43,6 +46,8 @@ import (
"knative.dev/pkg/apis"

"knative.dev/eventing/pkg/channel"
fakekubeclient "knative.dev/pkg/client/injection/kube/client/fake"
_ "knative.dev/pkg/system/testing"
)

// Domains used in subscriptions, which will be replaced by the real domains of the started HTTP
Expand Down Expand Up @@ -318,6 +323,10 @@ func TestFanoutEventHandler_ServeHTTP(t *testing.T) {
}

func testFanoutEventHandler(t *testing.T, async bool, receiverFunc channel.EventReceiverFunc, timeout time.Duration, inSubs []Subscription, subscriberHandler func(http.ResponseWriter, *http.Request), subscriberReqs int, replierHandler func(http.ResponseWriter, *http.Request), replierReqs int, expectedStatus int) {
ctx := context.Background()
ctx, _ = fakekubeclient.With(ctx)
ctx = injection.WithConfig(ctx, &rest.Config{})

var subscriberServerWg *sync.WaitGroup
reporter := channel.NewStatsReporter("testcontainer", "testpod")
if subscriberReqs != 0 {
Expand Down Expand Up @@ -362,6 +371,9 @@ func testFanoutEventHandler(t *testing.T, async bool, receiverFunc channel.Event
t.Fatal(err)
}

oidcTokenProvider := auth.NewOIDCTokenProvider(ctx)
dispatcher := kncloudevents.NewDispatcher(oidcTokenProvider)

calledChan := make(chan bool, 1)
recvOptionFunc := func(*channel.EventReceiver) error {
calledChan <- true
Expand All @@ -378,6 +390,7 @@ func testFanoutEventHandler(t *testing.T, async bool, receiverFunc channel.Event
nil,
nil,
nil,
dispatcher,
recvOptionFunc,
)
<-calledChan
Expand All @@ -403,7 +416,7 @@ func testFanoutEventHandler(t *testing.T, async bool, receiverFunc channel.Event
reqCtx, _ := trace.StartSpan(context.TODO(), "bla")
req := httptest.NewRequest(http.MethodPost, "http://channelname.channelnamespace/", nil).WithContext(reqCtx)

ctx := context.Background()
ctx = context.Background()

if err := bindingshttp.WriteRequest(ctx, binding.ToMessage(&event), req); err != nil {
t.Fatal("WriteRequest =", err)
Expand Down
Loading

0 comments on commit f52e92a

Please sign in to comment.