Skip to content

Commit

Permalink
Update Sequence to use correct conditions. (#5369)
Browse files Browse the repository at this point in the history
* Update Sequence to use correct conditions.

* lint

* Update go.sum

* Update go.sum
  • Loading branch information
Scott Nichols authored May 11, 2021
1 parent 6893451 commit b199359
Show file tree
Hide file tree
Showing 7 changed files with 314 additions and 225 deletions.
5 changes: 5 additions & 0 deletions cmd/schema/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"knative.dev/hack/schema/registry"

eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
flowsv1 "knative.dev/eventing/pkg/apis/flows/v1"
messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1"
sourcesv1 "knative.dev/eventing/pkg/apis/sources/v1"
)
Expand All @@ -43,6 +44,10 @@ func main() {
registry.Register(&sourcesv1.SinkBinding{})
registry.Register(&sourcesv1.ContainerSource{}) // WARNING: THIS DOES NOT WORK OUT OF THE BOX: See https://github.com/knative/eventing/issues/5353.

// Flows
registry.Register(&flowsv1.Sequence{})
registry.Register(&flowsv1.Parallel{})

if err := commands.New("knative.dev/eventing").Execute(); err != nil {
log.Fatal("Error during command execution: ", err)
}
Expand Down
296 changes: 166 additions & 130 deletions config/core/resources/sequence.yaml

Large diffs are not rendered by default.

34 changes: 18 additions & 16 deletions pkg/apis/flows/v1/sequence_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,11 @@ func (ss *SequenceStatus) PropagateChannelStatuses(channels []*eventingduckv1.Ch

}
for i, c := range channels {
// Mark the Sequence address as the Address of the first channel.
if i == 0 {
ss.setAddress(c.Status.Address)
}

ss.ChannelStatuses[i] = SequenceChannelStatus{
Channel: corev1.ObjectReference{
APIVersion: c.APIVersion,
Expand All @@ -130,20 +135,16 @@ func (ss *SequenceStatus) PropagateChannelStatuses(channels []*eventingduckv1.Ch
Namespace: c.Namespace,
},
}
// TODO: Once the addressable has a real status to dig through, use that here instead of
// addressable, because it might be addressable but not ready.
address := c.Status.AddressStatus.Address
if address != nil {
ss.ChannelStatuses[i].ReadyCondition = apis.Condition{Type: apis.ConditionReady, Status: corev1.ConditionTrue}

if ready := c.Status.GetCondition(apis.ConditionReady); ready != nil {
ss.ChannelStatuses[i].ReadyCondition = *ready
if !ready.IsTrue() {
allReady = false
}
} else {
ss.ChannelStatuses[i].ReadyCondition = apis.Condition{Type: apis.ConditionReady, Status: corev1.ConditionFalse, Reason: "NotAddressable", Message: "Channel is not addressable"}
ss.ChannelStatuses[i].ReadyCondition = apis.Condition{Type: apis.ConditionReady, Status: corev1.ConditionUnknown, Reason: "NoReady", Message: "Channel does not have Ready condition"}
allReady = false
}

// Mark the Sequence address as the Address of the first channel.
if i == 0 {
ss.setAddress(address)
}
}
if allReady {
sCondSet.Manage(ss).MarkTrue(SequenceConditionChannelsReady)
Expand All @@ -153,22 +154,23 @@ func (ss *SequenceStatus) PropagateChannelStatuses(channels []*eventingduckv1.Ch
}

func (ss *SequenceStatus) MarkChannelsNotReady(reason, messageFormat string, messageA ...interface{}) {
sCondSet.Manage(ss).MarkFalse(SequenceConditionChannelsReady, reason, messageFormat, messageA...)
sCondSet.Manage(ss).MarkUnknown(SequenceConditionChannelsReady, reason, messageFormat, messageA...)
}

func (ss *SequenceStatus) MarkSubscriptionsNotReady(reason, messageFormat string, messageA ...interface{}) {
sCondSet.Manage(ss).MarkFalse(SequenceConditionSubscriptionsReady, reason, messageFormat, messageA...)
sCondSet.Manage(ss).MarkUnknown(SequenceConditionSubscriptionsReady, reason, messageFormat, messageA...)
}

func (ss *SequenceStatus) MarkAddressableNotReady(reason, messageFormat string, messageA ...interface{}) {
sCondSet.Manage(ss).MarkFalse(SequenceConditionAddressable, reason, messageFormat, messageA...)
sCondSet.Manage(ss).MarkUnknown(SequenceConditionAddressable, reason, messageFormat, messageA...)
}

func (ss *SequenceStatus) setAddress(address *duckv1.Addressable) {
if address == nil || address.URL == nil {
sCondSet.Manage(ss).MarkFalse(SequenceConditionAddressable, "emptyAddress", "addressable is nil")
ss.Address = duckv1.Addressable{}
sCondSet.Manage(ss).MarkUnknown(SequenceConditionAddressable, "emptyAddress", "addressable is nil")
} else {
ss.AddressStatus.Address = &duckv1.Addressable{URL: address.URL}
ss.Address = duckv1.Addressable{URL: address.URL}
sCondSet.Manage(ss).MarkTrue(SequenceConditionAddressable)
}
}
107 changes: 76 additions & 31 deletions pkg/apis/flows/v1/sequence_lifecycle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,16 @@ func getChannelable(ready bool) *eventingduckv1.Channelable {
}

if ready {
c.Status.SetConditions([]apis.Condition{{
Type: apis.ConditionReady,
Status: corev1.ConditionTrue,
}})
c.Status.Address = &duckv1.Addressable{URL: URL}
} else {
c.Status.SetConditions([]apis.Condition{{
Type: apis.ConditionReady,
Status: corev1.ConditionUnknown,
}})
}

return &c
Expand Down Expand Up @@ -212,7 +221,7 @@ func TestSequencePropagateSubscriptionStatuses(t *testing.T) {
}{{
name: "empty",
subs: []*messagingv1.Subscription{},
want: corev1.ConditionFalse,
want: corev1.ConditionUnknown,
}, {
name: "empty status",
subs: []*messagingv1.Subscription{{
Expand All @@ -227,19 +236,19 @@ func TestSequencePropagateSubscriptionStatuses(t *testing.T) {
Status: messagingv1.SubscriptionStatus{},
},
},
want: corev1.ConditionFalse,
want: corev1.ConditionUnknown,
}, {
name: "one subscription not ready",
subs: []*messagingv1.Subscription{getSubscription("sub0", false)},
want: corev1.ConditionFalse,
want: corev1.ConditionUnknown,
}, {
name: "one subscription ready",
subs: []*messagingv1.Subscription{getSubscription("sub0", true)},
want: corev1.ConditionTrue,
}, {
name: "one subscription ready, one not",
subs: []*messagingv1.Subscription{getSubscription("sub0", true), getSubscription("sub1", false)},
want: corev1.ConditionFalse,
want: corev1.ConditionUnknown,
}, {
name: "two subscriptions ready",
subs: []*messagingv1.Subscription{getSubscription("sub0", true), getSubscription("sub1", true)},
Expand Down Expand Up @@ -267,19 +276,19 @@ func TestSequencePropagateChannelStatuses(t *testing.T) {
}{{
name: "empty",
channels: []*eventingduckv1.Channelable{},
want: corev1.ConditionFalse,
want: corev1.ConditionUnknown,
}, {
name: "one channelable not ready",
channels: []*eventingduckv1.Channelable{getChannelable(false)},
want: corev1.ConditionFalse,
want: corev1.ConditionUnknown,
}, {
name: "one channelable ready",
channels: []*eventingduckv1.Channelable{getChannelable(true)},
want: corev1.ConditionTrue,
}, {
name: "one channelable ready, one not",
channels: []*eventingduckv1.Channelable{getChannelable(true), getChannelable(false)},
want: corev1.ConditionFalse,
want: corev1.ConditionUnknown,
}, {
name: "two channelables ready",
channels: []*eventingduckv1.Channelable{getChannelable(true), getChannelable(true)},
Expand All @@ -293,7 +302,7 @@ func TestSequencePropagateChannelStatuses(t *testing.T) {
got := ps.GetCondition(SequenceConditionChannelsReady).Status
want := test.want
if want != got {
t.Errorf("unexpected conditions (-want, +got) = %v %v", want, got)
t.Errorf("unexpected conditions: want=%q, got=%q", want, got)
}
})
}
Expand Down Expand Up @@ -358,44 +367,80 @@ func TestSequenceReady(t *testing.T) {

func TestSequencePropagateSetAddress(t *testing.T) {
URL := apis.HTTP("example.com")
URL2 := apis.HTTP("another.example.com")
tests := []struct {
name string
address *duckv1.Addressable
want *duckv1.Addressable
wantStatus corev1.ConditionStatus
name string
status SequenceStatus
address *duckv1.Addressable
want duckv1.Addressable
wantStatus corev1.ConditionStatus
wantAddress string
}{{
name: "nil",
status: SequenceStatus{},
address: nil,
want: nil,
wantStatus: corev1.ConditionFalse,
want: duckv1.Addressable{},
wantStatus: corev1.ConditionUnknown,
}, {
name: "empty",
status: SequenceStatus{},
address: &duckv1.Addressable{},
want: nil,
wantStatus: corev1.ConditionFalse,
want: duckv1.Addressable{},
wantStatus: corev1.ConditionUnknown,
}, {
name: "URL",
address: &duckv1.Addressable{URL: URL},
want: &duckv1.Addressable{URL: URL},
wantStatus: corev1.ConditionTrue,
name: "URL",
status: SequenceStatus{},
address: &duckv1.Addressable{URL: URL},
want: duckv1.Addressable{URL: URL},
wantStatus: corev1.ConditionTrue,
wantAddress: "http://example.com",
}, {
name: "New URL",
status: SequenceStatus{
Address: duckv1.Addressable{
URL: URL2,
},
},
address: &duckv1.Addressable{URL: URL},
want: duckv1.Addressable{URL: URL},
wantStatus: corev1.ConditionTrue,
wantAddress: "http://example.com",
}, {
name: "Clear URL",
status: SequenceStatus{
Address: duckv1.Addressable{
URL: URL,
},
},
address: nil,
want: duckv1.Addressable{},
wantStatus: corev1.ConditionUnknown,
}, {
name: "nil",
status: SequenceStatus{},
address: &duckv1.Addressable{URL: nil},
want: nil,
wantStatus: corev1.ConditionFalse,
want: duckv1.Addressable{},
wantStatus: corev1.ConditionUnknown,
}}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
ps := SequenceStatus{}
ps.setAddress(test.address)
got := ps.Address
if diff := cmp.Diff(test.want, got, ignoreAllButTypeAndStatus); diff != "" {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {

tt.status.setAddress(tt.address)
got := tt.status.Address
if diff := cmp.Diff(tt.want, got, ignoreAllButTypeAndStatus); diff != "" {
t.Error("unexpected address (-want, +got) =", diff)
}
gotStatus := ps.GetCondition(SequenceConditionAddressable).Status
if test.wantStatus != gotStatus {
t.Errorf("unexpected conditions (-want, +got) = %v %v", test.wantStatus, gotStatus)
gotStatus := tt.status.GetCondition(SequenceConditionAddressable).Status
if tt.wantStatus != gotStatus {
t.Errorf("unexpected conditions (-want, +got) = %v %v", tt.wantStatus, gotStatus)
}
gotAddress := ""
if got.URL != nil {
gotAddress = got.URL.String()
}
if diff := cmp.Diff(tt.wantAddress, gotAddress); diff != "" {
t.Error("unexpected address.url (-want, +got) =", diff)
}
})
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/apis/flows/v1/sequence_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,11 @@ type SequenceStatus struct {
// Matches the Spec.Steps array in the order.
ChannelStatuses []SequenceChannelStatus `json:"channelStatuses"`

// AddressStatus is the starting point to this Sequence. Sending to this
// Address is the starting point to this Sequence. Sending to this
// will target the first subscriber.
// It generally has the form {channel}.{namespace}.svc.{cluster domain name}
duckv1.AddressStatus `json:",inline"`
// +optional
Address duckv1.Addressable `json:"address,omitempty"`
}

// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
Expand Down
2 changes: 1 addition & 1 deletion pkg/apis/flows/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit b199359

Please sign in to comment.