Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update Sequence to use correct conditions. #5369

Merged
merged 4 commits into from
May 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions cmd/schema/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"knative.dev/hack/schema/registry"

eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
flowsv1 "knative.dev/eventing/pkg/apis/flows/v1"
messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1"
sourcesv1 "knative.dev/eventing/pkg/apis/sources/v1"
)
Expand All @@ -42,6 +43,10 @@ func main() {
registry.Register(&sourcesv1.ApiServerSource{})
registry.Register(&sourcesv1.SinkBinding{})

// Flows
registry.Register(&flowsv1.Sequence{})
registry.Register(&flowsv1.Parallel{})

if err := commands.New("knative.dev/eventing").Execute(); err != nil {
log.Fatal("Error during command execution: ", err)
}
Expand Down
296 changes: 166 additions & 130 deletions config/core/resources/sequence.yaml

Large diffs are not rendered by default.

34 changes: 18 additions & 16 deletions pkg/apis/flows/v1/sequence_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,11 @@ func (ss *SequenceStatus) PropagateChannelStatuses(channels []*eventingduckv1.Ch

}
for i, c := range channels {
// Mark the Sequence address as the Address of the first channel.
if i == 0 {
ss.setAddress(c.Status.Address)
}

ss.ChannelStatuses[i] = SequenceChannelStatus{
Channel: corev1.ObjectReference{
APIVersion: c.APIVersion,
Expand All @@ -130,20 +135,16 @@ func (ss *SequenceStatus) PropagateChannelStatuses(channels []*eventingduckv1.Ch
Namespace: c.Namespace,
},
}
// TODO: Once the addressable has a real status to dig through, use that here instead of
// addressable, because it might be addressable but not ready.
address := c.Status.AddressStatus.Address
if address != nil {
ss.ChannelStatuses[i].ReadyCondition = apis.Condition{Type: apis.ConditionReady, Status: corev1.ConditionTrue}

if ready := c.Status.GetCondition(apis.ConditionReady); ready != nil {
ss.ChannelStatuses[i].ReadyCondition = *ready
if !ready.IsTrue() {
allReady = false
}
} else {
ss.ChannelStatuses[i].ReadyCondition = apis.Condition{Type: apis.ConditionReady, Status: corev1.ConditionFalse, Reason: "NotAddressable", Message: "Channel is not addressable"}
ss.ChannelStatuses[i].ReadyCondition = apis.Condition{Type: apis.ConditionReady, Status: corev1.ConditionUnknown, Reason: "NoReady", Message: "Channel does not have Ready condition"}
allReady = false
}

// Mark the Sequence address as the Address of the first channel.
if i == 0 {
ss.setAddress(address)
}
}
if allReady {
sCondSet.Manage(ss).MarkTrue(SequenceConditionChannelsReady)
Expand All @@ -153,22 +154,23 @@ func (ss *SequenceStatus) PropagateChannelStatuses(channels []*eventingduckv1.Ch
}

func (ss *SequenceStatus) MarkChannelsNotReady(reason, messageFormat string, messageA ...interface{}) {
sCondSet.Manage(ss).MarkFalse(SequenceConditionChannelsReady, reason, messageFormat, messageA...)
sCondSet.Manage(ss).MarkUnknown(SequenceConditionChannelsReady, reason, messageFormat, messageA...)
}

func (ss *SequenceStatus) MarkSubscriptionsNotReady(reason, messageFormat string, messageA ...interface{}) {
sCondSet.Manage(ss).MarkFalse(SequenceConditionSubscriptionsReady, reason, messageFormat, messageA...)
sCondSet.Manage(ss).MarkUnknown(SequenceConditionSubscriptionsReady, reason, messageFormat, messageA...)
}

func (ss *SequenceStatus) MarkAddressableNotReady(reason, messageFormat string, messageA ...interface{}) {
sCondSet.Manage(ss).MarkFalse(SequenceConditionAddressable, reason, messageFormat, messageA...)
sCondSet.Manage(ss).MarkUnknown(SequenceConditionAddressable, reason, messageFormat, messageA...)
}

func (ss *SequenceStatus) setAddress(address *duckv1.Addressable) {
if address == nil || address.URL == nil {
sCondSet.Manage(ss).MarkFalse(SequenceConditionAddressable, "emptyAddress", "addressable is nil")
ss.Address = duckv1.Addressable{}
sCondSet.Manage(ss).MarkUnknown(SequenceConditionAddressable, "emptyAddress", "addressable is nil")
} else {
ss.AddressStatus.Address = &duckv1.Addressable{URL: address.URL}
ss.Address = duckv1.Addressable{URL: address.URL}
sCondSet.Manage(ss).MarkTrue(SequenceConditionAddressable)
}
}
107 changes: 76 additions & 31 deletions pkg/apis/flows/v1/sequence_lifecycle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,16 @@ func getChannelable(ready bool) *eventingduckv1.Channelable {
}

if ready {
c.Status.SetConditions([]apis.Condition{{
Type: apis.ConditionReady,
Status: corev1.ConditionTrue,
}})
c.Status.Address = &duckv1.Addressable{URL: URL}
} else {
c.Status.SetConditions([]apis.Condition{{
Type: apis.ConditionReady,
Status: corev1.ConditionUnknown,
}})
}

return &c
Expand Down Expand Up @@ -212,7 +221,7 @@ func TestSequencePropagateSubscriptionStatuses(t *testing.T) {
}{{
name: "empty",
subs: []*messagingv1.Subscription{},
want: corev1.ConditionFalse,
want: corev1.ConditionUnknown,
}, {
name: "empty status",
subs: []*messagingv1.Subscription{{
Expand All @@ -227,19 +236,19 @@ func TestSequencePropagateSubscriptionStatuses(t *testing.T) {
Status: messagingv1.SubscriptionStatus{},
},
},
want: corev1.ConditionFalse,
want: corev1.ConditionUnknown,
}, {
name: "one subscription not ready",
subs: []*messagingv1.Subscription{getSubscription("sub0", false)},
want: corev1.ConditionFalse,
want: corev1.ConditionUnknown,
}, {
name: "one subscription ready",
subs: []*messagingv1.Subscription{getSubscription("sub0", true)},
want: corev1.ConditionTrue,
}, {
name: "one subscription ready, one not",
subs: []*messagingv1.Subscription{getSubscription("sub0", true), getSubscription("sub1", false)},
want: corev1.ConditionFalse,
want: corev1.ConditionUnknown,
}, {
name: "two subscriptions ready",
subs: []*messagingv1.Subscription{getSubscription("sub0", true), getSubscription("sub1", true)},
Expand Down Expand Up @@ -267,19 +276,19 @@ func TestSequencePropagateChannelStatuses(t *testing.T) {
}{{
name: "empty",
channels: []*eventingduckv1.Channelable{},
want: corev1.ConditionFalse,
want: corev1.ConditionUnknown,
}, {
name: "one channelable not ready",
channels: []*eventingduckv1.Channelable{getChannelable(false)},
want: corev1.ConditionFalse,
want: corev1.ConditionUnknown,
}, {
name: "one channelable ready",
channels: []*eventingduckv1.Channelable{getChannelable(true)},
want: corev1.ConditionTrue,
}, {
name: "one channelable ready, one not",
channels: []*eventingduckv1.Channelable{getChannelable(true), getChannelable(false)},
want: corev1.ConditionFalse,
want: corev1.ConditionUnknown,
}, {
name: "two channelables ready",
channels: []*eventingduckv1.Channelable{getChannelable(true), getChannelable(true)},
Expand All @@ -293,7 +302,7 @@ func TestSequencePropagateChannelStatuses(t *testing.T) {
got := ps.GetCondition(SequenceConditionChannelsReady).Status
want := test.want
if want != got {
t.Errorf("unexpected conditions (-want, +got) = %v %v", want, got)
t.Errorf("unexpected conditions: want=%q, got=%q", want, got)
}
})
}
Expand Down Expand Up @@ -358,44 +367,80 @@ func TestSequenceReady(t *testing.T) {

func TestSequencePropagateSetAddress(t *testing.T) {
URL := apis.HTTP("example.com")
URL2 := apis.HTTP("another.example.com")
tests := []struct {
name string
address *duckv1.Addressable
want *duckv1.Addressable
wantStatus corev1.ConditionStatus
name string
status SequenceStatus
address *duckv1.Addressable
want duckv1.Addressable
wantStatus corev1.ConditionStatus
wantAddress string
}{{
name: "nil",
status: SequenceStatus{},
address: nil,
want: nil,
wantStatus: corev1.ConditionFalse,
want: duckv1.Addressable{},
wantStatus: corev1.ConditionUnknown,
}, {
name: "empty",
status: SequenceStatus{},
address: &duckv1.Addressable{},
want: nil,
wantStatus: corev1.ConditionFalse,
want: duckv1.Addressable{},
wantStatus: corev1.ConditionUnknown,
}, {
name: "URL",
address: &duckv1.Addressable{URL: URL},
want: &duckv1.Addressable{URL: URL},
wantStatus: corev1.ConditionTrue,
name: "URL",
status: SequenceStatus{},
address: &duckv1.Addressable{URL: URL},
want: duckv1.Addressable{URL: URL},
wantStatus: corev1.ConditionTrue,
wantAddress: "http://example.com",
}, {
name: "New URL",
status: SequenceStatus{
Address: duckv1.Addressable{
URL: URL2,
},
},
address: &duckv1.Addressable{URL: URL},
want: duckv1.Addressable{URL: URL},
wantStatus: corev1.ConditionTrue,
wantAddress: "http://example.com",
}, {
name: "Clear URL",
status: SequenceStatus{
Address: duckv1.Addressable{
URL: URL,
},
},
address: nil,
want: duckv1.Addressable{},
wantStatus: corev1.ConditionUnknown,
}, {
name: "nil",
status: SequenceStatus{},
address: &duckv1.Addressable{URL: nil},
want: nil,
wantStatus: corev1.ConditionFalse,
want: duckv1.Addressable{},
wantStatus: corev1.ConditionUnknown,
}}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
ps := SequenceStatus{}
ps.setAddress(test.address)
got := ps.Address
if diff := cmp.Diff(test.want, got, ignoreAllButTypeAndStatus); diff != "" {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {

tt.status.setAddress(tt.address)
got := tt.status.Address
if diff := cmp.Diff(tt.want, got, ignoreAllButTypeAndStatus); diff != "" {
t.Error("unexpected address (-want, +got) =", diff)
}
gotStatus := ps.GetCondition(SequenceConditionAddressable).Status
if test.wantStatus != gotStatus {
t.Errorf("unexpected conditions (-want, +got) = %v %v", test.wantStatus, gotStatus)
gotStatus := tt.status.GetCondition(SequenceConditionAddressable).Status
if tt.wantStatus != gotStatus {
t.Errorf("unexpected conditions (-want, +got) = %v %v", tt.wantStatus, gotStatus)
}
gotAddress := ""
if got.URL != nil {
gotAddress = got.URL.String()
}
if diff := cmp.Diff(tt.wantAddress, gotAddress); diff != "" {
t.Error("unexpected address.url (-want, +got) =", diff)
}
})
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/apis/flows/v1/sequence_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,11 @@ type SequenceStatus struct {
// Matches the Spec.Steps array in the order.
ChannelStatuses []SequenceChannelStatus `json:"channelStatuses"`

// AddressStatus is the starting point to this Sequence. Sending to this
// Address is the starting point to this Sequence. Sending to this
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to make sure I understand correctly. This is equivalent because we inlined AddressStatus before and instead of inlining you changed it to be explicit? Is there a reason why this is not a pointer like what AddressStatus has?

https://github.com/knative/pkg/blob/main/apis/duck/v1/addressable_types.go#L64

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am doing what Broker does, we have both styles and because the inner url is a pointer, it seems like the wrapper to the addressable could be allowed to be not a pointer?

We should pick a style and make it consistent. The root issue might be https://github.com/knative/pkg/blob/main/apis/duck/v1/addressable_types.go#L64 has URL as not optional

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, should've been clearer. I saw there being inconsistencies also. But just wanted to make sure that my understanding is that there are no changes to wire format. We should probably vet these to make sure they are consistent, and make sure that we comment the duck properly since it says there to embed it like this (well, that's how I read it still).

// will target the first subscriber.
// It generally has the form {channel}.{namespace}.svc.{cluster domain name}
duckv1.AddressStatus `json:",inline"`
// +optional
Address duckv1.Addressable `json:"address,omitempty"`
}

// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
Expand Down
2 changes: 1 addition & 1 deletion pkg/apis/flows/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading