Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate channel reconciler to use messaging.* v1 resources #3871

Merged
merged 4 commits into from
Aug 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 2 additions & 50 deletions pkg/apis/duck/v1alpha1/channelable_combined_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1"
eventingduckv1beta1 "knative.dev/eventing/pkg/apis/duck/v1beta1"
"knative.dev/pkg/apis"
"knative.dev/pkg/apis/duck"
Expand All @@ -33,7 +32,7 @@ import (
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object

// ChannelableCombined is a skeleton type wrapping Subscribable and Addressable of
// v1alpha1, v1beta1 and v1 duck types. This is not to be used by resource writers and is
// v1alpha1 and v1beta1 duck types. This is not to be used by resource writers and is
// only used by Subscription Controller to synthesize patches and read the Status
// of the Channelable Resources.
// This is not a real resource.
Expand All @@ -56,17 +55,8 @@ type ChannelableCombinedSpec struct {
eventingduckv1beta1.SubscribableSpec `json:",inline"`

// DeliverySpec contains options controlling the event delivery
// for the v1beta1 spec compatibility.
// +optional
Delivery *eventingduckv1beta1.DeliverySpec `json:"delivery,omitempty"`

// SubscribableSpecv1 is for the v1 spec compatibility.
SubscribableSpecv1 eventingduckv1.SubscribableSpec `json:",inline"`

// DeliverySpecv1 contains options controlling the event delivery
// for the v1 spec compatibility.
// +optional
Deliveryv1 *eventingduckv1.DeliverySpec `json:"deliveryv1,omitempty"`
}

// ChannelableStatus contains the Status of a Channelable object.
Expand All @@ -81,8 +71,6 @@ type ChannelableCombinedStatus struct {
SubscribableTypeStatus `json:",inline"`
// SubscribableStatus is the v1beta1 part of the Subscribers status.
eventingduckv1beta1.SubscribableStatus `json:",inline"`
// SubscribableStatusv1 is the v1 part of the Subscribers status.
SubscribableStatusv1 eventingduckv1.SubscribableStatus `json:",inline"`
// ErrorChannel is set by the channel when it supports native error handling via a channel
// +optional
ErrorChannel *corev1.ObjectReference `json:"errorChannel,omitempty"`
Expand Down Expand Up @@ -125,23 +113,8 @@ func (c *ChannelableCombined) Populate() {
ReplyURI: apis.HTTP("sink2"),
}},
}
c.Spec.SubscribableSpecv1 = eventingduckv1.SubscribableSpec{
// Populate ALL fields
Subscribers: []eventingduckv1.SubscriberSpec{{
UID: "2f9b5e8e-deb6-11e8-9f32-f2801f1b9fd1",
Generation: 1,
SubscriberURI: apis.HTTP("call1"),
ReplyURI: apis.HTTP("sink2"),
}, {
UID: "34c5aec8-deb6-11e8-9f32-f2801f1b9fd1",
Generation: 2,
SubscriberURI: apis.HTTP("call2"),
ReplyURI: apis.HTTP("sink2"),
}},
}
retry := int32(5)
linear := eventingduckv1beta1.BackoffPolicyLinear
linearv1 := eventingduckv1.BackoffPolicyLinear
delay := "5s"
deadLetterSink := duckv1.Destination{
Ref: &duckv1.KReference{
Expand All @@ -158,12 +131,6 @@ func (c *ChannelableCombined) Populate() {
BackoffPolicy: &linear,
BackoffDelay: &delay,
}
c.Spec.Deliveryv1 = &eventingduckv1.DeliverySpec{
DeadLetterSink: &deadLetterSink,
Retry: &retry,
BackoffPolicy: &linearv1,
BackoffDelay: &delay,
}
subscribers := []eventingduckv1beta1.SubscriberStatus{{
UID: "2f9b5e8e-deb6-11e8-9f32-f2801f1b9fd1",
ObservedGeneration: 1,
Expand All @@ -175,17 +142,6 @@ func (c *ChannelableCombined) Populate() {
Ready: corev1.ConditionFalse,
Message: "Some message",
}}
subscribersv1 := []eventingduckv1.SubscriberStatus{{
UID: "2f9b5e8e-deb6-11e8-9f32-f2801f1b9fd1",
ObservedGeneration: 1,
Ready: corev1.ConditionTrue,
Message: "Some message",
}, {
UID: "34c5aec8-deb6-11e8-9f32-f2801f1b9fd1",
ObservedGeneration: 2,
Ready: corev1.ConditionFalse,
Message: "Some message",
}}
c.Status = ChannelableCombinedStatus{
AddressStatus: v1alpha1.AddressStatus{
Address: &v1alpha1.Addressable{
Expand All @@ -202,13 +158,9 @@ func (c *ChannelableCombined) Populate() {
SubscribableStatus: eventingduckv1beta1.SubscribableStatus{
Subscribers: subscribers,
},
SubscribableStatusv1: eventingduckv1.SubscribableStatus{
Subscribers: subscribersv1,
},
SubscribableTypeStatus: SubscribableTypeStatus{
SubscribableStatus: &SubscribableStatus{
Subscribers: subscribers,
Subscribersv1: subscribersv1,
Subscribers: subscribers,
},
},
}
Expand Down
54 changes: 0 additions & 54 deletions pkg/apis/duck/v1alpha1/channelable_combined_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"testing"

corev1 "k8s.io/api/core/v1"
eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1"
eventingduckv1beta1 "knative.dev/eventing/pkg/apis/duck/v1beta1"
"knative.dev/pkg/apis"
duckv1 "knative.dev/pkg/apis/duck/v1"
Expand All @@ -45,7 +44,6 @@ func TestChannelableCombinedPopulate(t *testing.T) {

retry := int32(5)
linear := eventingduckv1beta1.BackoffPolicyLinear
linearv1 := eventingduckv1.BackoffPolicyLinear
delay := "5s"
want := &ChannelableCombined{
Spec: ChannelableCombinedSpec{
Expand All @@ -63,20 +61,6 @@ func TestChannelableCombinedPopulate(t *testing.T) {
ReplyURI: apis.HTTP("sink2"),
}},
},
SubscribableSpecv1: eventingduckv1.SubscribableSpec{
// Populate ALL fields
Subscribers: []eventingduckv1.SubscriberSpec{{
UID: "2f9b5e8e-deb6-11e8-9f32-f2801f1b9fd1",
Generation: 1,
SubscriberURI: apis.HTTP("call1"),
ReplyURI: apis.HTTP("sink2"),
}, {
UID: "34c5aec8-deb6-11e8-9f32-f2801f1b9fd1",
Generation: 2,
SubscriberURI: apis.HTTP("call2"),
ReplyURI: apis.HTTP("sink2"),
}},
},
SubscribableTypeSpec: SubscribableTypeSpec{
Subscribable: &Subscribable{
Subscribers: []SubscriberSpec{{
Expand Down Expand Up @@ -106,20 +90,6 @@ func TestChannelableCombinedPopulate(t *testing.T) {
BackoffPolicy: &linear,
BackoffDelay: &delay,
},
Deliveryv1: &eventingduckv1.DeliverySpec{
DeadLetterSink: &duckv1.Destination{
Ref: &duckv1.KReference{
Name: "aname",
},
URI: &apis.URL{
Scheme: "http",
Host: "test-error-domain",
},
},
Retry: &retry,
BackoffPolicy: &linearv1,
BackoffDelay: &delay,
},
},

Status: ChannelableCombinedStatus{
Expand Down Expand Up @@ -148,19 +118,6 @@ func TestChannelableCombinedPopulate(t *testing.T) {
Message: "Some message",
}},
},
SubscribableStatusv1: eventingduckv1.SubscribableStatus{
Subscribers: []eventingduckv1.SubscriberStatus{{
UID: "2f9b5e8e-deb6-11e8-9f32-f2801f1b9fd1",
ObservedGeneration: 1,
Ready: corev1.ConditionTrue,
Message: "Some message",
}, {
UID: "34c5aec8-deb6-11e8-9f32-f2801f1b9fd1",
ObservedGeneration: 2,
Ready: corev1.ConditionFalse,
Message: "Some message",
}},
},
SubscribableTypeStatus: SubscribableTypeStatus{
SubscribableStatus: &SubscribableStatus{
Subscribers: []eventingduckv1beta1.SubscriberStatus{{
Expand All @@ -174,17 +131,6 @@ func TestChannelableCombinedPopulate(t *testing.T) {
Ready: corev1.ConditionFalse,
Message: "Some message",
}},
Subscribersv1: []eventingduckv1.SubscriberStatus{{
UID: "2f9b5e8e-deb6-11e8-9f32-f2801f1b9fd1",
ObservedGeneration: 1,
Ready: corev1.ConditionTrue,
Message: "Some message",
}, {
UID: "34c5aec8-deb6-11e8-9f32-f2801f1b9fd1",
ObservedGeneration: 2,
Ready: corev1.ConditionFalse,
Message: "Some message",
}},
},
},
},
Expand Down
27 changes: 0 additions & 27 deletions pkg/apis/duck/v1alpha1/subscribable_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"knative.dev/pkg/apis"
"knative.dev/pkg/apis/duck"

duckv1 "knative.dev/eventing/pkg/apis/duck/v1"
duckv1beta1 "knative.dev/eventing/pkg/apis/duck/v1beta1"
)

Expand Down Expand Up @@ -62,8 +61,6 @@ type SubscriberSpec struct {
DeadLetterSinkURI *apis.URL `json:"deadLetterSink,omitempty"`
// +optional
Delivery *duckv1beta1.DeliverySpec `json:"delivery,omitempty"`
// +optional
Deliveryv1 *duckv1.DeliverySpec `json:"deliveryv1,omitempty"`
}

// SubscribableStatus is the schema for the subscribable's status portion of the status
Expand All @@ -73,11 +70,6 @@ type SubscribableStatus struct {
// +patchMergeKey=uid
// +patchStrategy=merge
Subscribers []duckv1beta1.SubscriberStatus `json:"subscribers,omitempty" patchStrategy:"merge" patchMergeKey:"uid"`

// This is the list of subscription's statuses for this channel.
// +patchMergeKey=uid
// +patchStrategy=merge
Subscribersv1 []duckv1.SubscriberStatus `json:"subscribersv1,omitempty" patchStrategy:"merge" patchMergeKey:"uid"`
}

// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
Expand Down Expand Up @@ -144,14 +136,6 @@ func (s *SubscribableTypeStatus) AddSubscriberToSubscribableStatus(subscriberSta
s.SubscribableStatus.Subscribers = subscribers
}

// AddSubscriberToSubscribableStatus method is a Helper method for type SubscribableTypeStatus, if Subscribable Status needs to be appended
// with Subscribers, use this function, so that the value is reflected in both the duplicate fields residing
// in SubscribableTypeStatus
func (s *SubscribableTypeStatus) AddSubscriberV1ToSubscribableStatus(subscriberStatus duckv1.SubscriberStatus) {
subscribersv1 := append(s.GetSubscribableTypeStatus().Subscribersv1, subscriberStatus)
s.SubscribableStatus.Subscribersv1 = subscribersv1
}

// GetFullType implements duck.Implementable
func (s *Subscribable) GetFullType() duck.Populatable {
return &SubscribableType{}
Expand Down Expand Up @@ -186,17 +170,6 @@ func (c *SubscribableType) Populate() {
Ready: corev1.ConditionFalse,
Message: "Some message",
}},
Subscribersv1: []duckv1.SubscriberStatus{{
UID: "2f9b5e8e-deb6-11e8-9f32-f2801f1b9fd1",
ObservedGeneration: 1,
Ready: corev1.ConditionTrue,
Message: "Some message",
}, {
UID: "34c5aec8-deb6-11e8-9f32-f2801f1b9fd1",
ObservedGeneration: 2,
Ready: corev1.ConditionFalse,
Message: "Some message",
}},
})
}

Expand Down
44 changes: 22 additions & 22 deletions pkg/apis/duck/v1alpha1/subscribable_types_conversion.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,11 @@ func (source *SubscriberSpec) ConvertTo(ctx context.Context, obj apis.Convertibl
sink.UID = source.UID
sink.Generation = source.Generation
sink.SubscriberURI = source.SubscriberURI
if source.Deliveryv1 != nil {
sink.Delivery = source.Deliveryv1
if source.Delivery != nil {
sink.Delivery = &eventingduckv1.DeliverySpec{}
if err := source.Delivery.ConvertTo(ctx, sink.Delivery); err != nil {
return err
}
} else {
// If however, there's a Deprecated DeadLetterSinkURI, convert that up
// to DeliverySpec.
Expand Down Expand Up @@ -139,14 +142,12 @@ func (source *SubscribableTypeStatus) ConvertTo(ctx context.Context, obj apis.Co
}
case *eventingduckv1.SubscribableStatus:
if source.SubscribableStatus != nil &&
len(source.SubscribableStatus.Subscribersv1) > 0 {
sink.Subscribers = make([]eventingduckv1.SubscriberStatus, len(source.SubscribableStatus.Subscribersv1))
for i, ss := range source.SubscribableStatus.Subscribersv1 {
sink.Subscribers[i] = eventingduckv1.SubscriberStatus{
UID: ss.UID,
ObservedGeneration: ss.ObservedGeneration,
Ready: ss.Ready,
Message: ss.Message,
len(source.SubscribableStatus.Subscribers) > 0 {
sink.Subscribers = make([]eventingduckv1.SubscriberStatus, len(source.SubscribableStatus.Subscribers))
for i, ss := range source.SubscribableStatus.Subscribers {
sink.Subscribers[i] = eventingduckv1.SubscriberStatus{}
if err := ss.ConvertTo(ctx, &sink.Subscribers[i]); err != nil {
return err
}
}
}
Expand Down Expand Up @@ -228,16 +229,17 @@ func (sink *SubscriberSpec) ConvertFrom(ctx context.Context, obj apis.Convertibl
sink.Delivery = source.Delivery
sink.DeadLetterSinkURI = deadLetterSinkURI
case *eventingduckv1.SubscriberSpec:
var deadLetterSinkURI *apis.URL
if source.Delivery != nil && source.Delivery.DeadLetterSink != nil {
deadLetterSinkURI = source.Delivery.DeadLetterSink.URI
}
sink.UID = source.UID
sink.Generation = source.Generation
sink.SubscriberURI = source.SubscriberURI
sink.ReplyURI = source.ReplyURI
sink.Deliveryv1 = source.Delivery
sink.DeadLetterSinkURI = deadLetterSinkURI
if source.Delivery != nil {
sink.Delivery = &eventingduckv1beta1.DeliverySpec{}
if err := sink.Delivery.ConvertFrom(ctx, source.Delivery); err != nil {
return err
}
sink.DeadLetterSinkURI = source.Delivery.DeadLetterSink.URI
}
default:
return fmt.Errorf("unknown version, got: %T", sink)
}
Expand All @@ -264,14 +266,12 @@ func (sink *SubscribableTypeStatus) ConvertFrom(ctx context.Context, obj apis.Co
case *eventingduckv1.SubscribableStatus:
if len(source.Subscribers) > 0 {
sink.SubscribableStatus = &SubscribableStatus{
Subscribersv1: make([]eventingduckv1.SubscriberStatus, len(source.Subscribers)),
Subscribers: make([]eventingduckv1beta1.SubscriberStatus, len(source.Subscribers)),
}
for i, ss := range source.Subscribers {
sink.SubscribableStatus.Subscribersv1[i] = eventingduckv1.SubscriberStatus{
UID: ss.UID,
ObservedGeneration: ss.ObservedGeneration,
Ready: ss.Ready,
Message: ss.Message,
sink.SubscribableStatus.Subscribers[i] = eventingduckv1beta1.SubscriberStatus{}
if err := sink.SubscribableStatus.Subscribers[i].ConvertFrom(ctx, &ss); err != nil {
return err
}
}
}
Expand Down
Loading