Skip to content

Commit

Permalink
Implement KReference.Group resolution for Subscriber.Ref (#5440)
Browse files Browse the repository at this point in the history
* Added the directory and the action for the experimental features e2e/conformance tests

Signed-off-by: Francesco Guardiani <francescoguard@gmail.com>

* Removed custom build tag and using just e2e tag
Fix e2e tests

Signed-off-by: Francesco Guardiani <francescoguard@gmail.com>

* Progress

Signed-off-by: Francesco Guardiani <francescoguard@gmail.com>

* Trying with my fork of pkg

Signed-off-by: Francesco Guardiani <francescoguard@gmail.com>

* Some todos

Signed-off-by: Francesco Guardiani <francescoguard@gmail.com>

* WIP

Signed-off-by: Francesco Guardiani <francescoguard@gmail.com>

* Added test

Signed-off-by: Francesco Guardiani <francescoguard@gmail.com>

* Some fixes

Signed-off-by: Francesco Guardiani <francescoguard@gmail.com>

* Now everything works

Signed-off-by: Francesco Guardiani <francescoguard@gmail.com>

* Rebase

Signed-off-by: Francesco Guardiani <francescoguard@gmail.com>

* Fix leftover

Signed-off-by: Francesco Guardiani <francescoguard@gmail.com>

* Changes from pkg

Signed-off-by: Francesco Guardiani <francescoguard@gmail.com>

* Using configStore

Signed-off-by: Francesco Guardiani <francescoguard@gmail.com>

* Fill the context with proper context decorators of specific features

Signed-off-by: Francesco Guardiani <francescoguard@gmail.com>

* Use new way to configure tests

Signed-off-by: Francesco Guardiani <francescoguard@gmail.com>

* Link to details

Signed-off-by: Francesco Guardiani <francescoguard@gmail.com>

* Newline

Signed-off-by: Francesco Guardiani <francescoguard@gmail.com>

* Remove hacky replace

Signed-off-by: Francesco Guardiani <francescoguard@gmail.com>

* Updated with latest changes

Signed-off-by: Francesco Guardiani <francescoguard@gmail.com>

* Added knative_reference

Signed-off-by: Francesco Guardiani <francescoguard@gmail.com>

* Fixed UT

Signed-off-by: Francesco Guardiani <francescoguard@gmail.com>

* Missing newline

Signed-off-by: Francesco Guardiani <francescoguard@gmail.com>

* Fixed UT

Signed-off-by: Francesco Guardiani <francescoguard@gmail.com>
  • Loading branch information
slinkydeveloper authored Jun 16, 2021
1 parent 193809a commit 6adafe5
Show file tree
Hide file tree
Showing 17 changed files with 588 additions and 7 deletions.
17 changes: 14 additions & 3 deletions cmd/webhook/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"knative.dev/eventing/pkg/apis/feature"

"knative.dev/pkg/configmap"
"knative.dev/pkg/controller"
Expand Down Expand Up @@ -91,9 +92,12 @@ func NewDefaultingAdmissionController(ctx context.Context, cmw configmap.Watcher
channelStore := channeldefaultconfig.NewStore(logging.FromContext(ctx).Named("channel-config-store"))
channelStore.WatchConfigs(cmw)

featureStore := feature.NewStore(logging.FromContext(ctx).Named("feature-config-store"))
featureStore.WatchConfigs(cmw)

// Decorate contexts with the current state of the config.
ctxFunc := func(ctx context.Context) context.Context {
return channelStore.ToContext(store.ToContext(ctx))
return featureStore.ToContext(channelStore.ToContext(store.ToContext(ctx)))
}

return defaulting.NewAdmissionController(ctx,
Expand Down Expand Up @@ -125,9 +129,13 @@ func NewValidationAdmissionController(ctx context.Context, cmw configmap.Watcher

pingstore := pingdefaultconfig.NewStore(logging.FromContext(ctx).Named("ping-config-store"))
pingstore.WatchConfigs(cmw)

featureStore := feature.NewStore(logging.FromContext(ctx).Named("feature-config-store"))
featureStore.WatchConfigs(cmw)

// Decorate contexts with the current state of the config.
ctxFunc := func(ctx context.Context) context.Context {
return channelStore.ToContext(pingstore.ToContext(store.ToContext(ctx)))
return featureStore.ToContext(channelStore.ToContext(pingstore.ToContext(store.ToContext(ctx))))
}

return validation.NewAdmissionController(ctx,
Expand Down Expand Up @@ -201,9 +209,12 @@ func NewConversionController(ctx context.Context, cmw configmap.Watcher) *contro
channelStore := channeldefaultconfig.NewStore(logging.FromContext(ctx).Named("channel-config-store"))
channelStore.WatchConfigs(cmw)

featureStore := feature.NewStore(logging.FromContext(ctx).Named("feature-config-store"))
featureStore.WatchConfigs(cmw)

// Decorate contexts with the current state of the config.
ctxFunc := func(ctx context.Context) context.Context {
return channelStore.ToContext(store.ToContext(ctx))
return featureStore.ToContext(channelStore.ToContext(store.ToContext(ctx)))
}

var (
Expand Down
3 changes: 3 additions & 0 deletions config/core/configmaps/features.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,6 @@ metadata:
knative.dev/config-propagation: original
knative.dev/config-category: eventing
data:
# ALPHA feature: The kreference-group allows you to use the Group field in KReferences.
# For more details: https://github.com/knative/eventing/issues/5086
kreference-group: "disabled"
1 change: 1 addition & 0 deletions config/core/resources/subscription.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ spec:
namespace:
description: 'Namespace of the referent. More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/namespaces/ This is optional field, it gets defaulted to the object holding it if left out.'
type: string
x-kubernetes-preserve-unknown-fields: true # This is necessary to enable the experimental feature
uri:
description: URI can be an absolute URL(non-empty scheme and non-empty host) pointing to the target or a relative URI. Relative URIs will be resolved using the base URI retrieved from Ref.
type: string
Expand Down
4 changes: 4 additions & 0 deletions pkg/apis/feature/flag_names.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,7 @@ limitations under the License.
*/

package feature

const (
KReferenceGroup = "kreference-group"
)
10 changes: 9 additions & 1 deletion pkg/apis/feature/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package feature
import (
"context"

duckv1 "knative.dev/pkg/apis/duck/v1"
"knative.dev/pkg/configmap"
)

Expand Down Expand Up @@ -50,7 +51,7 @@ func FromContextOrDefaults(ctx context.Context) Flags {
// ToContext attaches the provided Flags to the provided context, returning the
// new context with the Flags attached.
func ToContext(ctx context.Context, c Flags) context.Context {
return context.WithValue(ctx, cfgKey{}, c)
return fillContextWithFeatureSpecificFlags(context.WithValue(ctx, cfgKey{}, c), c)
}

// Store is a typed wrapper around configmap.Untyped store to handle our configmaps.
Expand Down Expand Up @@ -98,3 +99,10 @@ func (s *Store) Load() Flags {
}
return loaded.(Flags)
}

func fillContextWithFeatureSpecificFlags(ctx context.Context, flags Flags) context.Context {
if flags.IsEnabled(KReferenceGroup) {
ctx = duckv1.KReferenceGroupAllowed(ctx)
}
return ctx
}
160 changes: 160 additions & 0 deletions pkg/apis/messaging/v1/subscription_validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"testing"

"github.com/google/go-cmp/cmp"
"knative.dev/eventing/pkg/apis/feature"
"knative.dev/pkg/apis"
duckv1 "knative.dev/pkg/apis/duck/v1"
)
Expand Down Expand Up @@ -243,6 +244,165 @@ func TestSubscriptionSpecValidation(t *testing.T) {
}
}

func TestSubscriptionSpecValidationWithKRefGroupFeatureEnabled(t *testing.T) {
tests := []struct {
name string
c *SubscriptionSpec
want *apis.FieldError
}{{
name: "valid",
c: &SubscriptionSpec{
Channel: getValidChannelRef(),
Subscriber: getValidDestination(),
},
want: nil,
}, {
name: "valid with reply",
c: &SubscriptionSpec{
Channel: getValidChannelRef(),
Subscriber: getValidDestination(),
Reply: getValidReply(),
},
want: nil,
}, {
name: "empty Channel",
c: &SubscriptionSpec{
Channel: duckv1.KReference{},
},
want: func() *apis.FieldError {
fe := apis.ErrMissingField("channel")
fe.Details = "the Subscription must reference a channel"
return fe
}(),
}, {
name: "missing name in Channel",
c: &SubscriptionSpec{
Channel: duckv1.KReference{
Kind: channelKind,
APIVersion: channelAPIVersion,
},
Subscriber: getValidDestination(),
},
want: func() *apis.FieldError {
fe := apis.ErrMissingField("channel.name")
return fe
}(),
}, {
name: "missing Subscriber and Reply",
c: &SubscriptionSpec{
Channel: getValidChannelRef(),
},
want: func() *apis.FieldError {
fe := apis.ErrMissingField("reply", "subscriber")
fe.Details = "the Subscription must reference at least one of (reply or a subscriber)"
return fe
}(),
}, {
name: "empty Subscriber and Reply",
c: &SubscriptionSpec{
Channel: getValidChannelRef(),
Subscriber: &duckv1.Destination{},
Reply: &duckv1.Destination{},
},
want: func() *apis.FieldError {
fe := apis.ErrMissingField("reply", "subscriber")
fe.Details = "the Subscription must reference at least one of (reply or a subscriber)"
return fe
}(),
}, {
name: "missing Reply",
c: &SubscriptionSpec{
Channel: getValidChannelRef(),
Subscriber: getValidDestination(),
},
want: nil,
}, {
name: "empty Reply",
c: &SubscriptionSpec{
Channel: getValidChannelRef(),
Subscriber: getValidDestination(),
Reply: &duckv1.Destination{},
},
want: nil,
}, {
name: "missing Subscriber",
c: &SubscriptionSpec{
Channel: getValidChannelRef(),
Reply: getValidReply(),
},
want: nil,
}, {
name: "empty Subscriber",
c: &SubscriptionSpec{
Channel: getValidChannelRef(),
Subscriber: &duckv1.Destination{},
Reply: getValidReply(),
},
want: nil,
}, {
name: "missing name in channel, and missing subscriber, reply",
c: &SubscriptionSpec{
Channel: duckv1.KReference{
Kind: channelKind,
APIVersion: channelAPIVersion,
},
},
want: func() *apis.FieldError {
fe := apis.ErrMissingField("reply", "subscriber")
fe.Details = "the Subscription must reference at least one of (reply or a subscriber)"
return apis.ErrMissingField("channel.name").Also(fe)
}(),
}, {
name: "empty",
c: &SubscriptionSpec{},
want: func() *apis.FieldError {
fe := apis.ErrMissingField("channel")
fe.Details = "the Subscription must reference a channel"
return fe
}(),
}, {
name: "missing name in Subscriber.Ref",
c: &SubscriptionSpec{
Channel: getValidChannelRef(),
Subscriber: &duckv1.Destination{
Ref: &duckv1.KReference{
Namespace: namespace,
Kind: channelKind,
APIVersion: channelAPIVersion,
},
},
},
want: apis.ErrMissingField("subscriber.ref.name"),
}, {
name: "missing name in Subscriber.Ref",
c: &SubscriptionSpec{
Channel: getValidChannelRef(),
Subscriber: getValidDestination(),
Reply: &duckv1.Destination{
Ref: &duckv1.KReference{
Namespace: namespace,
Name: "",
Kind: channelKind,
APIVersion: channelAPIVersion,
},
},
},
want: apis.ErrMissingField("reply.ref.name"),
}}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
ctx := feature.ToContext(context.TODO(), feature.Flags{
feature.KReferenceGroup: feature.Allowed,
})
got := test.c.Validate(ctx)
if diff := cmp.Diff(test.want.Error(), got.Error()); diff != "" {
t.Errorf("%s: validateChannel (-want, +got) = %v", test.name, diff)
}
})
}
}

func TestSubscriptionImmutable(t *testing.T) {
newChannel := getValidChannelRef()
newChannel.Name = "newChannel"
Expand Down
13 changes: 12 additions & 1 deletion pkg/reconciler/subscription/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@ package subscription
import (
"context"

"knative.dev/eventing/pkg/apis/feature"
"knative.dev/pkg/client/injection/apiextensions/informers/apiextensions/v1/customresourcedefinition"
"knative.dev/pkg/configmap"
"knative.dev/pkg/controller"
"knative.dev/pkg/kref"
"knative.dev/pkg/logging"
"knative.dev/pkg/resolver"
"knative.dev/pkg/tracker"
Expand All @@ -44,12 +47,20 @@ func NewController(
subscriptionInformer := subscription.Get(ctx)
channelInformer := channel.Get(ctx)

featureStore := feature.NewStore(logging.FromContext(ctx).Named("feature-config-store"))
featureStore.WatchConfigs(cmw)

r := &Reconciler{
dynamicClientSet: dynamicclient.Get(ctx),
kreferenceResolver: kref.NewKReferenceResolver(customresourcedefinition.Get(ctx).Lister()),
subscriptionLister: subscriptionInformer.Lister(),
channelLister: channelInformer.Lister(),
}
impl := subscriptionreconciler.NewImpl(ctx, r)
impl := subscriptionreconciler.NewImpl(ctx, r, func(impl *controller.Impl) controller.Options {
return controller.Options{
ConfigStore: featureStore,
}
})

logging.FromContext(ctx).Info("Setting up event handlers")
subscriptionInformer.Informer().AddEventHandler(controller.HandleAll(impl.Enqueue))
Expand Down
13 changes: 12 additions & 1 deletion pkg/reconciler/subscription/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,31 @@ package subscription
import (
"testing"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"knative.dev/pkg/configmap"
. "knative.dev/pkg/reconciler/testing"

"knative.dev/eventing/pkg/apis/feature"

// Fake injection informers
_ "knative.dev/eventing/pkg/client/injection/ducks/duck/v1/channelable/fake"
_ "knative.dev/eventing/pkg/client/injection/informers/messaging/v1/channel/fake"
_ "knative.dev/eventing/pkg/client/injection/informers/messaging/v1/subscription/fake"
_ "knative.dev/pkg/client/injection/apiextensions/informers/apiextensions/v1/customresourcedefinition/fake"
_ "knative.dev/pkg/client/injection/ducks/duck/v1/addressable/fake"
)

func TestNew(t *testing.T) {
ctx, _ := SetupFakeContext(t)

c := NewController(ctx, configmap.NewStaticWatcher())
c := NewController(ctx, configmap.NewStaticWatcher(
&corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: feature.FlagsConfigName,
},
},
))

if c == nil {
t.Fatal("Expected NewController to return a non-nil value")
Expand Down
20 changes: 20 additions & 0 deletions pkg/reconciler/subscription/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,14 @@ import (

"knative.dev/pkg/apis/duck"
duckv1 "knative.dev/pkg/apis/duck/v1"
"knative.dev/pkg/kref"
"knative.dev/pkg/logging"
pkgreconciler "knative.dev/pkg/reconciler"
"knative.dev/pkg/resolver"
"knative.dev/pkg/tracker"

eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1"
"knative.dev/eventing/pkg/apis/feature"
v1 "knative.dev/eventing/pkg/apis/messaging/v1"
subscriptionreconciler "knative.dev/eventing/pkg/client/injection/reconciler/messaging/v1/subscription"
listers "knative.dev/eventing/pkg/client/listers/messaging/v1"
Expand All @@ -62,6 +64,9 @@ type Reconciler struct {
// DynamicClientSet allows us to configure pluggable Build objects
dynamicClientSet dynamic.Interface

// crdLister is used to resolve the ref version
kreferenceResolver *kref.KReferenceResolver

// listers index properties about resources
subscriptionLister listers.SubscriptionLister
channelLister listers.ChannelLister
Expand Down Expand Up @@ -201,6 +206,21 @@ func (r *Reconciler) resolveSubscriber(ctx context.Context, subscription *v1.Sub
if subscriber.Ref != nil {
subscriber.Ref.Namespace = subscription.Namespace
}

// Resolve the group
if subscriber.Ref != nil && feature.FromContext(ctx).IsEnabled(feature.KReferenceGroup) {
var err error
subscriber.Ref, err = r.kreferenceResolver.ResolveGroup(subscriber.Ref)
if err != nil {
logging.FromContext(ctx).Warnw("Failed to resolve Subscriber.Ref",
zap.Error(err),
zap.Any("subscriber", subscriber))
subscription.Status.MarkReferencesNotResolved(subscriberResolveFailed, "Failed to resolve spec.subscriber.ref: %v", err)
return pkgreconciler.NewEvent(corev1.EventTypeWarning, subscriberResolveFailed, "Failed to resolve spec.subscriber.ref: %w", err)
}
logging.FromContext(ctx).Debugw("Group resolved", zap.Any("spec.subscriber.ref", subscriber.Ref))
}

subscriberURI, err := r.destinationResolver.URIFromDestinationV1(ctx, *subscriber, subscription)
if err != nil {
logging.FromContext(ctx).Warnw("Failed to resolve Subscriber",
Expand Down
Loading

0 comments on commit 6adafe5

Please sign in to comment.